Source code for lsurf.utilities.recording_sphere

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Recording Sphere for Ray Detection and Data Storage

This module implements a spherical detection surface for capturing rays
at a specified altitude above Earth's surface, along with HDF5/numpy
data storage for ray information.

The recording sphere captures:
- Position (x, y, z)
- Direction (dx, dy, dz)
- Time of arrival
- Intensity
- Wavelength
- Generation number
- All geometric information needed to fully reconstruct rays
"""

from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any

import numpy as np
from numpy.typing import NDArray

# Optional h5py import
try:
    import h5py

    HAS_H5PY = True
except ImportError:
    HAS_H5PY = False

from ..surfaces import EARTH_RADIUS
from .ray_data import RayBatch


[docs] @dataclass class RecordedRays: """ Container for recorded ray data at the detection sphere. All arrays have shape (N,) or (N, 3) where N is the number of recorded rays. Attributes ---------- positions : ndarray, shape (N, 3) Intersection positions on the recording sphere (meters) directions : ndarray, shape (N, 3) Ray directions at intersection (unit vectors) times : ndarray, shape (N,) Time of arrival at recording sphere (seconds) intensities : ndarray, shape (N,) Ray intensity at recording sphere wavelengths : ndarray, shape (N,) Ray wavelength (meters) generations : ndarray, shape (N,) Ray generation (number of surface interactions) polarization_vectors : ndarray, shape (N, 3), optional 3D polarization vectors (electric field direction) at intersection. Unit vectors perpendicular to ray direction representing E-field orientation. Derived quantities (computed on demand or at save time): elevation_angles : ndarray, shape (N,) Elevation angle above local horizon (radians) azimuth_angles : ndarray, shape (N,) Azimuth angle in local tangent plane (radians) zenith_angles : ndarray, shape (N,) Angle from local zenith (radians) local_positions : ndarray, shape (N, 2) Position in local tangent coordinates (meters) """ positions: NDArray[np.float32] directions: NDArray[np.float32] times: NDArray[np.float32] intensities: NDArray[np.float32] wavelengths: NDArray[np.float32] generations: NDArray[np.int32] polarization_vectors: NDArray[np.float32] | None = None ray_indices: NDArray[np.int32] | None = None @property def num_rays(self) -> int: """Number of recorded rays.""" return len(self.positions)
[docs] def compute_angular_coordinates( self, earth_center: NDArray[np.float64] = None, ) -> dict[str, NDArray[np.float32]]: """ Compute angular coordinates for all recorded ray intersection points. Computes spherical coordinates (latitude/longitude) of intersection points on the detection sphere relative to Earth's center. Parameters ---------- earth_center : ndarray, optional Earth center position, default (0, 0, -EARTH_RADIUS) Returns ------- dict Dictionary with: - 'elevation': Latitude angle above equator (radians, -π/2 to π/2) - 'azimuth': Longitude angle (radians, -π to π) - 'zenith': Zenith angle from north pole (radians, 0 to π) - 'incidence': Angle between ray direction and outward radial (radians) """ if earth_center is None: earth_center = np.array([0, 0, -EARTH_RADIUS], dtype=np.float64) # Vector from Earth center to intersection point to_pos = self.positions.astype(np.float64) - earth_center r = np.linalg.norm(to_pos, axis=1, keepdims=True) # Spherical coordinates of the intersection point # Elevation (latitude): angle above the equatorial plane (XY plane through Earth center) elevation = np.arcsin(to_pos[:, 2] / r.squeeze()) # Azimuth (longitude): angle in the XY plane azimuth = np.arctan2(to_pos[:, 1], to_pos[:, 0]) # Zenith angle: angle from +Z axis (north pole) np.arccos(to_pos[:, 2] / r.squeeze()) # Incidence angle: angle between ray direction and outward radial (for reference) radial = to_pos / r cos_incidence = np.sum(self.directions * radial, axis=1) incidence = np.arccos(np.clip(cos_incidence, -1.0, 1.0)) # For azimuth, project direction onto local tangent plane # Create local basis global_z = np.array([0, 0, 1], dtype=np.float64) tangent_x = np.cross(global_z, radial) tangent_x_norm = np.linalg.norm(tangent_x, axis=1, keepdims=True) tangent_x = tangent_x / np.maximum(tangent_x_norm, 1e-10) tangent_y = np.cross(radial, tangent_x) # Project direction onto tangent plane dir_x = np.sum(self.directions * tangent_x, axis=1) dir_y = np.sum(self.directions * tangent_y, axis=1) azimuth = np.arctan2(dir_y, dir_x) return { "elevation": elevation.astype(np.float32), "azimuth": azimuth.astype(np.float32), "zenith": incidence.astype(np.float32), "incidence": incidence.astype(np.float32), }
[docs] def compute_viewing_angle_from_origin( self, origin: NDArray[np.float64] = None, ) -> NDArray[np.float32]: """ Compute viewing angle from horizontal at specified origin. Calculates the angle above the horizontal plane (XY plane) when viewing each intersection point from the origin position. Parameters ---------- origin : ndarray, optional Observer position, default (0, 0, 0) - Earth surface at reference point Returns ------- ndarray Viewing angle from horizontal in radians (-π/2 to π/2) Positive angles are above horizontal, negative below """ if origin is None: origin = np.array([0, 0, 0], dtype=np.float64) # Vector from origin to intersection point to_point = self.positions.astype(np.float64) - origin # Horizontal distance (in XY plane) horiz_dist = np.sqrt(to_point[:, 0] ** 2 + to_point[:, 1] ** 2) # Vertical distance (Z component) vert_dist = to_point[:, 2] # Angle from horizontal: arctan(z / sqrt(x^2 + y^2)) viewing_angle = np.arctan2(vert_dist, horiz_dist) return viewing_angle.astype(np.float32)
[docs] def compute_ray_direction_angles(self) -> dict[str, NDArray[np.float32]]: """ Compute elevation and azimuth angles of ray directions. Calculates the angles of the ray direction vectors themselves, not the position coordinates. Useful for understanding the angular distribution of ray propagation. Returns ------- dict Dictionary with: - 'elevation': Angle above horizontal plane in radians (-π/2 to π/2) 0 = horizontal, π/2 = straight up, -π/2 = straight down - 'azimuth': Azimuth angle in horizontal plane in radians (-π to π) 0 = +X direction, π/2 = +Y direction """ # Elevation: angle from horizontal (XY) plane # elevation = arcsin(z_component) elevation = np.arcsin(np.clip(self.directions[:, 2], -1.0, 1.0)) # Azimuth: angle in XY plane # azimuth = arctan2(y, x) azimuth = np.arctan2(self.directions[:, 1], self.directions[:, 0]) return { "elevation": elevation.astype(np.float32), "azimuth": azimuth.astype(np.float32), }
[docs] class RecordingSphere: """ Spherical detection surface at a specified altitude above Earth. Records all rays that intersect the sphere, capturing full ray state for later analysis. Parameters ---------- altitude : float Altitude above Earth's surface in meters (default 33 km) earth_center : Tuple[float, float, float] Center of Earth, default (0, 0, -EARTH_RADIUS) earth_radius : float Earth radius in meters Notes ----- The recording sphere has radius = earth_radius + altitude, centered at earth_center. """
[docs] def __init__( self, altitude: float = 33000.0, # 33 km default earth_center: tuple[float, float, float] = (0, 0, -EARTH_RADIUS), earth_radius: float = EARTH_RADIUS, ): self.altitude = altitude self.earth_center = np.array(earth_center, dtype=np.float64) self.earth_radius = earth_radius self.sphere_radius = earth_radius + altitude
[docs] def detect( self, rays: RayBatch, compute_travel_time: bool = True, speed_of_light: float = 299792458.0, max_propagation_distance: float | None = None, ) -> RecordedRays: """ Detect rays intersecting the recording sphere. Parameters ---------- rays : RayBatch Rays to detect compute_travel_time : bool If True, add travel time to intersection to ray's accumulated time speed_of_light : float Speed of light for time computation max_propagation_distance : float, optional Maximum distance rays can propagate before detection (meters). If None, no limit is applied. Use this to prevent detecting rays that would hit the sphere from the opposite hemisphere. Returns ------- RecordedRays Recorded ray data for all intersecting rays """ active_mask = rays.active if not np.any(active_mask): return RecordedRays( positions=np.zeros((0, 3), dtype=np.float32), directions=np.zeros((0, 3), dtype=np.float32), times=np.zeros(0, dtype=np.float32), intensities=np.zeros(0, dtype=np.float32), wavelengths=np.zeros(0, dtype=np.float32), generations=np.zeros(0, dtype=np.int32), ) origins = rays.positions[active_mask].astype(np.float64) directions = rays.directions[active_mask].astype(np.float64) # Ray-sphere intersection using numerically stable formulation # Ray: P = O + t*D # Sphere: |P - C|² = R² # Using half_b = b/2 to reduce magnitude and improve numerical stability oc = origins - self.earth_center a = np.sum(directions * directions, axis=1) half_b = np.sum(directions * oc, axis=1) # This is b/2 c = np.sum(oc * oc, axis=1) - self.sphere_radius**2 # Discriminant using half_b: (b/2)² - ac instead of b² - 4ac discriminant = half_b**2 - a * c has_hit = discriminant >= 0 # Get intersection distance using stable formula sqrt_disc = np.sqrt(np.maximum(discriminant, 0)) # t = (-b ± sqrt(b²-4ac)) / 2a = (-half_b ± sqrt(half_b² - ac)) / a t1 = (-half_b - sqrt_disc) / (a + 1e-20) t2 = (-half_b + sqrt_disc) / (a + 1e-20) # For rays inside the sphere going outward, use t2 (far intersection) # For rays outside going inward, use t1 (near intersection) # Check if origin is inside or outside sphere dist_from_center = np.linalg.norm(oc, axis=1) inside = dist_from_center < self.sphere_radius t = np.where(inside, t2, t1) t = np.where(t > 0, t, t2) # If first choice was negative, try second # Valid hits: positive t, discriminant >= 0, and within max distance valid_hits = has_hit & (t > 1e-6) if max_propagation_distance is not None: valid_hits = valid_hits & (t < max_propagation_distance) if not np.any(valid_hits): return RecordedRays( positions=np.zeros((0, 3), dtype=np.float32), directions=np.zeros((0, 3), dtype=np.float32), times=np.zeros(0, dtype=np.float32), intensities=np.zeros(0, dtype=np.float32), wavelengths=np.zeros(0, dtype=np.float32), generations=np.zeros(0, dtype=np.int32), ) # Compute intersection positions hit_positions = ( origins[valid_hits] + t[valid_hits, np.newaxis] * directions[valid_hits] ) hit_directions = directions[valid_hits] hit_distances = t[valid_hits] # Get other ray properties active_indices = np.where(active_mask)[0] hit_indices = active_indices[valid_hits] hit_intensities = rays.intensities[hit_indices] hit_wavelengths = rays.wavelengths[hit_indices] hit_generations = rays.generations[hit_indices] hit_times = rays.accumulated_time[hit_indices] # Add travel time if compute_travel_time: travel_time = hit_distances / speed_of_light hit_times = hit_times + travel_time.astype(np.float32) # Get polarization vectors if available hit_polarization_vectors = None if rays.polarization_vector is not None: hit_polarization_vectors = rays.polarization_vector[hit_indices] return RecordedRays( positions=hit_positions.astype(np.float32), directions=hit_directions.astype(np.float32), times=hit_times, intensities=hit_intensities, wavelengths=hit_wavelengths, generations=hit_generations, polarization_vectors=hit_polarization_vectors, )
[docs] def save_recorded_rays_hdf5( recorded_rays: RecordedRays, filepath: str, metadata: dict[str, Any] | None = None, compression: str = "gzip", ) -> None: """ Save recorded rays to HDF5 file. Parameters ---------- recorded_rays : RecordedRays Ray data to save filepath : str Output file path metadata : dict, optional Additional metadata to store (simulation parameters, etc.) compression : str Compression algorithm ('gzip', 'lzf', or None) """ if not HAS_H5PY: raise ImportError( "h5py is required for HDF5 support. Install with: pip install h5py" ) filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) with h5py.File(filepath, "w") as f: # Create rays group rays_grp = f.create_group("rays") # Store ray data with compression rays_grp.create_dataset( "positions", data=recorded_rays.positions, compression=compression ) rays_grp.create_dataset( "directions", data=recorded_rays.directions, compression=compression ) rays_grp.create_dataset( "times", data=recorded_rays.times, compression=compression ) rays_grp.create_dataset( "intensities", data=recorded_rays.intensities, compression=compression ) rays_grp.create_dataset( "wavelengths", data=recorded_rays.wavelengths, compression=compression ) rays_grp.create_dataset( "generations", data=recorded_rays.generations, compression=compression ) # Save polarization vectors if available if recorded_rays.polarization_vectors is not None: rays_grp.create_dataset( "polarization_vectors", data=recorded_rays.polarization_vectors, compression=compression, ) # Compute and store angular coordinates angular = recorded_rays.compute_angular_coordinates() angular_grp = f.create_group("angular") for key, value in angular.items(): angular_grp.create_dataset(key, data=value, compression=compression) # Store metadata meta_grp = f.create_group("metadata") meta_grp.attrs["num_rays"] = recorded_rays.num_rays meta_grp.attrs["save_time"] = datetime.now().isoformat() if metadata is not None: for key, value in metadata.items(): if isinstance(value, (int, float, str, bool)): meta_grp.attrs[key] = value elif isinstance(value, np.ndarray): meta_grp.create_dataset(key, data=value) elif isinstance(value, (list, tuple)): meta_grp.create_dataset(key, data=np.array(value)) else: meta_grp.attrs[key] = str(value)
[docs] def load_recorded_rays_hdf5(filepath: str) -> tuple[RecordedRays, dict[str, Any]]: """ Load recorded rays from HDF5 file. Parameters ---------- filepath : str Input file path Returns ------- recorded_rays : RecordedRays Loaded ray data metadata : dict Loaded metadata """ if not HAS_H5PY: raise ImportError( "h5py is required for HDF5 support. Install with: pip install h5py" ) with h5py.File(filepath, "r") as f: rays_grp = f["rays"] # Load polarization vectors if available polarization_vectors = None if "polarization_vectors" in rays_grp: polarization_vectors = rays_grp["polarization_vectors"][...] recorded_rays = RecordedRays( positions=rays_grp["positions"][...], directions=rays_grp["directions"][...], times=rays_grp["times"][...], intensities=rays_grp["intensities"][...], wavelengths=rays_grp["wavelengths"][...], generations=rays_grp["generations"][...], polarization_vectors=polarization_vectors, ) metadata = {} if "metadata" in f: meta_grp = f["metadata"] for key, value in meta_grp.attrs.items(): metadata[key] = value for key in meta_grp.keys(): metadata[key] = meta_grp[key][...] return recorded_rays, metadata
[docs] def save_recorded_rays_numpy( recorded_rays: RecordedRays, filepath: str, metadata: dict[str, Any] | None = None, ) -> None: """ Save recorded rays to numpy .npz file. Parameters ---------- recorded_rays : RecordedRays Ray data to save filepath : str Output file path metadata : dict, optional Additional metadata to store """ filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) # Compute angular coordinates angular = recorded_rays.compute_angular_coordinates() # Prepare save dict save_dict = { "positions": recorded_rays.positions, "directions": recorded_rays.directions, "times": recorded_rays.times, "intensities": recorded_rays.intensities, "wavelengths": recorded_rays.wavelengths, "generations": recorded_rays.generations, "elevation": angular["elevation"], "azimuth": angular["azimuth"], "zenith": angular["zenith"], } # Save polarization vectors if available if recorded_rays.polarization_vectors is not None: save_dict["polarization_vectors"] = recorded_rays.polarization_vectors # Add metadata as arrays or scalars if metadata is not None: for key, value in metadata.items(): save_dict[f"meta_{key}"] = np.array(value) np.savez_compressed(filepath, **save_dict)
[docs] def load_recorded_rays_numpy(filepath: str) -> tuple[RecordedRays, dict[str, Any]]: """ Load recorded rays from numpy .npz file. Parameters ---------- filepath : str Input file path Returns ------- recorded_rays : RecordedRays Loaded ray data metadata : dict Loaded metadata """ data = np.load(filepath) # Load polarization vectors if available polarization_vectors = None if "polarization_vectors" in data.files: polarization_vectors = data["polarization_vectors"] recorded_rays = RecordedRays( positions=data["positions"], directions=data["directions"], times=data["times"], intensities=data["intensities"], wavelengths=data["wavelengths"], generations=data["generations"], polarization_vectors=polarization_vectors, ) metadata = {} for key in data.files: if key.startswith("meta_"): metadata[key[5:]] = data[key] return recorded_rays, metadata
[docs] class LocalRecordingSphere: """ Simple spherical detection surface centered at origin. Records all rays that intersect the sphere from inside, useful for local-scale simulations without Earth curvature. Parameters ---------- radius : float Sphere radius in meters (default 33 km) center : tuple Center position (default (0, 0, 0)) """
[docs] def __init__(self, radius: float = 33000.0, center=(0, 0, 0)): self.radius = radius self.center = np.array(center, dtype=np.float64) self.sphere_radius = radius # For compatibility with RecordingSphere
[docs] def record_rays(self, rays: RayBatch) -> RecordedRays: """ Record rays that intersect the sphere. Parameters ---------- rays : RayBatch Rays to check for intersections Returns ------- RecordedRays Recorded ray data """ # Vector from ray origins to sphere center oc = rays.positions - self.center # Quadratic equation coefficients for ray-sphere intersection # (origin + t*direction - center)^2 = radius^2 a = np.sum(rays.directions**2, axis=1) b = 2 * np.sum(oc * rays.directions, axis=1) c = np.sum(oc**2, axis=1) - self.radius**2 discriminant = b**2 - 4 * a * c # Find rays that hit the sphere hit_mask = discriminant >= 0 if np.sum(hit_mask) == 0: # No intersections return RecordedRays( positions=np.empty((0, 3), dtype=np.float32), directions=np.empty((0, 3), dtype=np.float32), times=np.empty(0, dtype=np.float32), intensities=np.empty(0, dtype=np.float32), wavelengths=np.empty(0, dtype=np.float32), generations=np.empty(0, dtype=np.int32), ) # Calculate intersection distances sqrt_disc = np.sqrt(discriminant[hit_mask]) t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask]) t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask]) # Take the positive intersection (forward along ray) # For rays inside the sphere, take t2 (exit point) t = np.where(t1 > 0, t1, t2) valid = t > 0 if np.sum(valid) == 0: return RecordedRays( positions=np.empty((0, 3), dtype=np.float32), directions=np.empty((0, 3), dtype=np.float32), times=np.empty(0, dtype=np.float32), intensities=np.empty(0, dtype=np.float32), wavelengths=np.empty(0, dtype=np.float32), generations=np.empty(0, dtype=np.int32), ) # Extract hit rays final_mask = np.zeros(len(hit_mask), dtype=bool) final_mask[np.where(hit_mask)[0][valid]] = True hit_positions = rays.positions[final_mask] hit_directions = rays.directions[final_mask] hit_intensities = rays.intensities[final_mask] hit_times = rays.accumulated_time[final_mask] hit_wavelengths = rays.wavelengths[final_mask] hit_generations = rays.generations[final_mask] t_final = t[valid] # Calculate intersection positions intersection_positions = hit_positions + t_final[:, np.newaxis] * hit_directions # Update times (distance / speed of light) c_light = 299792458.0 # m/s arrival_times = hit_times + (t_final / c_light).astype(np.float32) # Get polarization vectors if available hit_polarization_vectors = None if rays.polarization_vector is not None: hit_polarization_vectors = rays.polarization_vector[final_mask] return RecordedRays( positions=intersection_positions.astype(np.float32), directions=hit_directions.astype(np.float32), times=arrival_times, intensities=hit_intensities, wavelengths=hit_wavelengths, generations=hit_generations, polarization_vectors=hit_polarization_vectors, )