# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Recording Sphere for Ray Detection and Data Storage
This module implements a spherical detection surface for capturing rays
at a specified altitude above Earth's surface, along with HDF5/numpy
data storage for ray information.
The recording sphere captures:
- Position (x, y, z)
- Direction (dx, dy, dz)
- Time of arrival
- Intensity
- Wavelength
- Generation number
- All geometric information needed to fully reconstruct rays
"""
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
from numpy.typing import NDArray
# Optional h5py import
try:
import h5py
HAS_H5PY = True
except ImportError:
HAS_H5PY = False
from ..surfaces import EARTH_RADIUS
from .ray_data import RayBatch
[docs]
@dataclass
class RecordedRays:
"""
Container for recorded ray data at the detection sphere.
All arrays have shape (N,) or (N, 3) where N is the number of recorded rays.
Attributes
----------
positions : ndarray, shape (N, 3)
Intersection positions on the recording sphere (meters)
directions : ndarray, shape (N, 3)
Ray directions at intersection (unit vectors)
times : ndarray, shape (N,)
Time of arrival at recording sphere (seconds)
intensities : ndarray, shape (N,)
Ray intensity at recording sphere
wavelengths : ndarray, shape (N,)
Ray wavelength (meters)
generations : ndarray, shape (N,)
Ray generation (number of surface interactions)
polarization_vectors : ndarray, shape (N, 3), optional
3D polarization vectors (electric field direction) at intersection.
Unit vectors perpendicular to ray direction representing E-field orientation.
Derived quantities (computed on demand or at save time):
elevation_angles : ndarray, shape (N,)
Elevation angle above local horizon (radians)
azimuth_angles : ndarray, shape (N,)
Azimuth angle in local tangent plane (radians)
zenith_angles : ndarray, shape (N,)
Angle from local zenith (radians)
local_positions : ndarray, shape (N, 2)
Position in local tangent coordinates (meters)
"""
positions: NDArray[np.float32]
directions: NDArray[np.float32]
times: NDArray[np.float32]
intensities: NDArray[np.float32]
wavelengths: NDArray[np.float32]
generations: NDArray[np.int32]
polarization_vectors: NDArray[np.float32] | None = None
ray_indices: NDArray[np.int32] | None = None
@property
def num_rays(self) -> int:
"""Number of recorded rays."""
return len(self.positions)
[docs]
def compute_angular_coordinates(
self,
earth_center: NDArray[np.float64] = None,
) -> dict[str, NDArray[np.float32]]:
"""
Compute angular coordinates for all recorded ray intersection points.
Computes spherical coordinates (latitude/longitude) of intersection points
on the detection sphere relative to Earth's center.
Parameters
----------
earth_center : ndarray, optional
Earth center position, default (0, 0, -EARTH_RADIUS)
Returns
-------
dict
Dictionary with:
- 'elevation': Latitude angle above equator (radians, -π/2 to π/2)
- 'azimuth': Longitude angle (radians, -π to π)
- 'zenith': Zenith angle from north pole (radians, 0 to π)
- 'incidence': Angle between ray direction and outward radial (radians)
"""
if earth_center is None:
earth_center = np.array([0, 0, -EARTH_RADIUS], dtype=np.float64)
# Vector from Earth center to intersection point
to_pos = self.positions.astype(np.float64) - earth_center
r = np.linalg.norm(to_pos, axis=1, keepdims=True)
# Spherical coordinates of the intersection point
# Elevation (latitude): angle above the equatorial plane (XY plane through Earth center)
elevation = np.arcsin(to_pos[:, 2] / r.squeeze())
# Azimuth (longitude): angle in the XY plane
azimuth = np.arctan2(to_pos[:, 1], to_pos[:, 0])
# Zenith angle: angle from +Z axis (north pole)
np.arccos(to_pos[:, 2] / r.squeeze())
# Incidence angle: angle between ray direction and outward radial (for reference)
radial = to_pos / r
cos_incidence = np.sum(self.directions * radial, axis=1)
incidence = np.arccos(np.clip(cos_incidence, -1.0, 1.0))
# For azimuth, project direction onto local tangent plane
# Create local basis
global_z = np.array([0, 0, 1], dtype=np.float64)
tangent_x = np.cross(global_z, radial)
tangent_x_norm = np.linalg.norm(tangent_x, axis=1, keepdims=True)
tangent_x = tangent_x / np.maximum(tangent_x_norm, 1e-10)
tangent_y = np.cross(radial, tangent_x)
# Project direction onto tangent plane
dir_x = np.sum(self.directions * tangent_x, axis=1)
dir_y = np.sum(self.directions * tangent_y, axis=1)
azimuth = np.arctan2(dir_y, dir_x)
return {
"elevation": elevation.astype(np.float32),
"azimuth": azimuth.astype(np.float32),
"zenith": incidence.astype(np.float32),
"incidence": incidence.astype(np.float32),
}
[docs]
def compute_viewing_angle_from_origin(
self,
origin: NDArray[np.float64] = None,
) -> NDArray[np.float32]:
"""
Compute viewing angle from horizontal at specified origin.
Calculates the angle above the horizontal plane (XY plane) when
viewing each intersection point from the origin position.
Parameters
----------
origin : ndarray, optional
Observer position, default (0, 0, 0) - Earth surface at reference point
Returns
-------
ndarray
Viewing angle from horizontal in radians (-π/2 to π/2)
Positive angles are above horizontal, negative below
"""
if origin is None:
origin = np.array([0, 0, 0], dtype=np.float64)
# Vector from origin to intersection point
to_point = self.positions.astype(np.float64) - origin
# Horizontal distance (in XY plane)
horiz_dist = np.sqrt(to_point[:, 0] ** 2 + to_point[:, 1] ** 2)
# Vertical distance (Z component)
vert_dist = to_point[:, 2]
# Angle from horizontal: arctan(z / sqrt(x^2 + y^2))
viewing_angle = np.arctan2(vert_dist, horiz_dist)
return viewing_angle.astype(np.float32)
[docs]
def compute_ray_direction_angles(self) -> dict[str, NDArray[np.float32]]:
"""
Compute elevation and azimuth angles of ray directions.
Calculates the angles of the ray direction vectors themselves,
not the position coordinates. Useful for understanding the
angular distribution of ray propagation.
Returns
-------
dict
Dictionary with:
- 'elevation': Angle above horizontal plane in radians (-π/2 to π/2)
0 = horizontal, π/2 = straight up, -π/2 = straight down
- 'azimuth': Azimuth angle in horizontal plane in radians (-π to π)
0 = +X direction, π/2 = +Y direction
"""
# Elevation: angle from horizontal (XY) plane
# elevation = arcsin(z_component)
elevation = np.arcsin(np.clip(self.directions[:, 2], -1.0, 1.0))
# Azimuth: angle in XY plane
# azimuth = arctan2(y, x)
azimuth = np.arctan2(self.directions[:, 1], self.directions[:, 0])
return {
"elevation": elevation.astype(np.float32),
"azimuth": azimuth.astype(np.float32),
}
[docs]
class RecordingSphere:
"""
Spherical detection surface at a specified altitude above Earth.
Records all rays that intersect the sphere, capturing full ray state
for later analysis.
Parameters
----------
altitude : float
Altitude above Earth's surface in meters (default 33 km)
earth_center : Tuple[float, float, float]
Center of Earth, default (0, 0, -EARTH_RADIUS)
earth_radius : float
Earth radius in meters
Notes
-----
The recording sphere has radius = earth_radius + altitude, centered
at earth_center.
"""
[docs]
def __init__(
self,
altitude: float = 33000.0, # 33 km default
earth_center: tuple[float, float, float] = (0, 0, -EARTH_RADIUS),
earth_radius: float = EARTH_RADIUS,
):
self.altitude = altitude
self.earth_center = np.array(earth_center, dtype=np.float64)
self.earth_radius = earth_radius
self.sphere_radius = earth_radius + altitude
[docs]
def detect(
self,
rays: RayBatch,
compute_travel_time: bool = True,
speed_of_light: float = 299792458.0,
max_propagation_distance: float | None = None,
) -> RecordedRays:
"""
Detect rays intersecting the recording sphere.
Parameters
----------
rays : RayBatch
Rays to detect
compute_travel_time : bool
If True, add travel time to intersection to ray's accumulated time
speed_of_light : float
Speed of light for time computation
max_propagation_distance : float, optional
Maximum distance rays can propagate before detection (meters).
If None, no limit is applied. Use this to prevent detecting rays
that would hit the sphere from the opposite hemisphere.
Returns
-------
RecordedRays
Recorded ray data for all intersecting rays
"""
active_mask = rays.active
if not np.any(active_mask):
return RecordedRays(
positions=np.zeros((0, 3), dtype=np.float32),
directions=np.zeros((0, 3), dtype=np.float32),
times=np.zeros(0, dtype=np.float32),
intensities=np.zeros(0, dtype=np.float32),
wavelengths=np.zeros(0, dtype=np.float32),
generations=np.zeros(0, dtype=np.int32),
)
origins = rays.positions[active_mask].astype(np.float64)
directions = rays.directions[active_mask].astype(np.float64)
# Ray-sphere intersection using numerically stable formulation
# Ray: P = O + t*D
# Sphere: |P - C|² = R²
# Using half_b = b/2 to reduce magnitude and improve numerical stability
oc = origins - self.earth_center
a = np.sum(directions * directions, axis=1)
half_b = np.sum(directions * oc, axis=1) # This is b/2
c = np.sum(oc * oc, axis=1) - self.sphere_radius**2
# Discriminant using half_b: (b/2)² - ac instead of b² - 4ac
discriminant = half_b**2 - a * c
has_hit = discriminant >= 0
# Get intersection distance using stable formula
sqrt_disc = np.sqrt(np.maximum(discriminant, 0))
# t = (-b ± sqrt(b²-4ac)) / 2a = (-half_b ± sqrt(half_b² - ac)) / a
t1 = (-half_b - sqrt_disc) / (a + 1e-20)
t2 = (-half_b + sqrt_disc) / (a + 1e-20)
# For rays inside the sphere going outward, use t2 (far intersection)
# For rays outside going inward, use t1 (near intersection)
# Check if origin is inside or outside sphere
dist_from_center = np.linalg.norm(oc, axis=1)
inside = dist_from_center < self.sphere_radius
t = np.where(inside, t2, t1)
t = np.where(t > 0, t, t2) # If first choice was negative, try second
# Valid hits: positive t, discriminant >= 0, and within max distance
valid_hits = has_hit & (t > 1e-6)
if max_propagation_distance is not None:
valid_hits = valid_hits & (t < max_propagation_distance)
if not np.any(valid_hits):
return RecordedRays(
positions=np.zeros((0, 3), dtype=np.float32),
directions=np.zeros((0, 3), dtype=np.float32),
times=np.zeros(0, dtype=np.float32),
intensities=np.zeros(0, dtype=np.float32),
wavelengths=np.zeros(0, dtype=np.float32),
generations=np.zeros(0, dtype=np.int32),
)
# Compute intersection positions
hit_positions = (
origins[valid_hits] + t[valid_hits, np.newaxis] * directions[valid_hits]
)
hit_directions = directions[valid_hits]
hit_distances = t[valid_hits]
# Get other ray properties
active_indices = np.where(active_mask)[0]
hit_indices = active_indices[valid_hits]
hit_intensities = rays.intensities[hit_indices]
hit_wavelengths = rays.wavelengths[hit_indices]
hit_generations = rays.generations[hit_indices]
hit_times = rays.accumulated_time[hit_indices]
# Add travel time
if compute_travel_time:
travel_time = hit_distances / speed_of_light
hit_times = hit_times + travel_time.astype(np.float32)
# Get polarization vectors if available
hit_polarization_vectors = None
if rays.polarization_vector is not None:
hit_polarization_vectors = rays.polarization_vector[hit_indices]
return RecordedRays(
positions=hit_positions.astype(np.float32),
directions=hit_directions.astype(np.float32),
times=hit_times,
intensities=hit_intensities,
wavelengths=hit_wavelengths,
generations=hit_generations,
polarization_vectors=hit_polarization_vectors,
)
[docs]
def save_recorded_rays_hdf5(
recorded_rays: RecordedRays,
filepath: str,
metadata: dict[str, Any] | None = None,
compression: str = "gzip",
) -> None:
"""
Save recorded rays to HDF5 file.
Parameters
----------
recorded_rays : RecordedRays
Ray data to save
filepath : str
Output file path
metadata : dict, optional
Additional metadata to store (simulation parameters, etc.)
compression : str
Compression algorithm ('gzip', 'lzf', or None)
"""
if not HAS_H5PY:
raise ImportError(
"h5py is required for HDF5 support. Install with: pip install h5py"
)
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with h5py.File(filepath, "w") as f:
# Create rays group
rays_grp = f.create_group("rays")
# Store ray data with compression
rays_grp.create_dataset(
"positions", data=recorded_rays.positions, compression=compression
)
rays_grp.create_dataset(
"directions", data=recorded_rays.directions, compression=compression
)
rays_grp.create_dataset(
"times", data=recorded_rays.times, compression=compression
)
rays_grp.create_dataset(
"intensities", data=recorded_rays.intensities, compression=compression
)
rays_grp.create_dataset(
"wavelengths", data=recorded_rays.wavelengths, compression=compression
)
rays_grp.create_dataset(
"generations", data=recorded_rays.generations, compression=compression
)
# Save polarization vectors if available
if recorded_rays.polarization_vectors is not None:
rays_grp.create_dataset(
"polarization_vectors",
data=recorded_rays.polarization_vectors,
compression=compression,
)
# Compute and store angular coordinates
angular = recorded_rays.compute_angular_coordinates()
angular_grp = f.create_group("angular")
for key, value in angular.items():
angular_grp.create_dataset(key, data=value, compression=compression)
# Store metadata
meta_grp = f.create_group("metadata")
meta_grp.attrs["num_rays"] = recorded_rays.num_rays
meta_grp.attrs["save_time"] = datetime.now().isoformat()
if metadata is not None:
for key, value in metadata.items():
if isinstance(value, (int, float, str, bool)):
meta_grp.attrs[key] = value
elif isinstance(value, np.ndarray):
meta_grp.create_dataset(key, data=value)
elif isinstance(value, (list, tuple)):
meta_grp.create_dataset(key, data=np.array(value))
else:
meta_grp.attrs[key] = str(value)
[docs]
def load_recorded_rays_hdf5(filepath: str) -> tuple[RecordedRays, dict[str, Any]]:
"""
Load recorded rays from HDF5 file.
Parameters
----------
filepath : str
Input file path
Returns
-------
recorded_rays : RecordedRays
Loaded ray data
metadata : dict
Loaded metadata
"""
if not HAS_H5PY:
raise ImportError(
"h5py is required for HDF5 support. Install with: pip install h5py"
)
with h5py.File(filepath, "r") as f:
rays_grp = f["rays"]
# Load polarization vectors if available
polarization_vectors = None
if "polarization_vectors" in rays_grp:
polarization_vectors = rays_grp["polarization_vectors"][...]
recorded_rays = RecordedRays(
positions=rays_grp["positions"][...],
directions=rays_grp["directions"][...],
times=rays_grp["times"][...],
intensities=rays_grp["intensities"][...],
wavelengths=rays_grp["wavelengths"][...],
generations=rays_grp["generations"][...],
polarization_vectors=polarization_vectors,
)
metadata = {}
if "metadata" in f:
meta_grp = f["metadata"]
for key, value in meta_grp.attrs.items():
metadata[key] = value
for key in meta_grp.keys():
metadata[key] = meta_grp[key][...]
return recorded_rays, metadata
[docs]
def save_recorded_rays_numpy(
recorded_rays: RecordedRays,
filepath: str,
metadata: dict[str, Any] | None = None,
) -> None:
"""
Save recorded rays to numpy .npz file.
Parameters
----------
recorded_rays : RecordedRays
Ray data to save
filepath : str
Output file path
metadata : dict, optional
Additional metadata to store
"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
# Compute angular coordinates
angular = recorded_rays.compute_angular_coordinates()
# Prepare save dict
save_dict = {
"positions": recorded_rays.positions,
"directions": recorded_rays.directions,
"times": recorded_rays.times,
"intensities": recorded_rays.intensities,
"wavelengths": recorded_rays.wavelengths,
"generations": recorded_rays.generations,
"elevation": angular["elevation"],
"azimuth": angular["azimuth"],
"zenith": angular["zenith"],
}
# Save polarization vectors if available
if recorded_rays.polarization_vectors is not None:
save_dict["polarization_vectors"] = recorded_rays.polarization_vectors
# Add metadata as arrays or scalars
if metadata is not None:
for key, value in metadata.items():
save_dict[f"meta_{key}"] = np.array(value)
np.savez_compressed(filepath, **save_dict)
[docs]
def load_recorded_rays_numpy(filepath: str) -> tuple[RecordedRays, dict[str, Any]]:
"""
Load recorded rays from numpy .npz file.
Parameters
----------
filepath : str
Input file path
Returns
-------
recorded_rays : RecordedRays
Loaded ray data
metadata : dict
Loaded metadata
"""
data = np.load(filepath)
# Load polarization vectors if available
polarization_vectors = None
if "polarization_vectors" in data.files:
polarization_vectors = data["polarization_vectors"]
recorded_rays = RecordedRays(
positions=data["positions"],
directions=data["directions"],
times=data["times"],
intensities=data["intensities"],
wavelengths=data["wavelengths"],
generations=data["generations"],
polarization_vectors=polarization_vectors,
)
metadata = {}
for key in data.files:
if key.startswith("meta_"):
metadata[key[5:]] = data[key]
return recorded_rays, metadata
[docs]
class LocalRecordingSphere:
"""
Simple spherical detection surface centered at origin.
Records all rays that intersect the sphere from inside, useful for
local-scale simulations without Earth curvature.
Parameters
----------
radius : float
Sphere radius in meters (default 33 km)
center : tuple
Center position (default (0, 0, 0))
"""
[docs]
def __init__(self, radius: float = 33000.0, center=(0, 0, 0)):
self.radius = radius
self.center = np.array(center, dtype=np.float64)
self.sphere_radius = radius # For compatibility with RecordingSphere
[docs]
def record_rays(self, rays: RayBatch) -> RecordedRays:
"""
Record rays that intersect the sphere.
Parameters
----------
rays : RayBatch
Rays to check for intersections
Returns
-------
RecordedRays
Recorded ray data
"""
# Vector from ray origins to sphere center
oc = rays.positions - self.center
# Quadratic equation coefficients for ray-sphere intersection
# (origin + t*direction - center)^2 = radius^2
a = np.sum(rays.directions**2, axis=1)
b = 2 * np.sum(oc * rays.directions, axis=1)
c = np.sum(oc**2, axis=1) - self.radius**2
discriminant = b**2 - 4 * a * c
# Find rays that hit the sphere
hit_mask = discriminant >= 0
if np.sum(hit_mask) == 0:
# No intersections
return RecordedRays(
positions=np.empty((0, 3), dtype=np.float32),
directions=np.empty((0, 3), dtype=np.float32),
times=np.empty(0, dtype=np.float32),
intensities=np.empty(0, dtype=np.float32),
wavelengths=np.empty(0, dtype=np.float32),
generations=np.empty(0, dtype=np.int32),
)
# Calculate intersection distances
sqrt_disc = np.sqrt(discriminant[hit_mask])
t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask])
t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask])
# Take the positive intersection (forward along ray)
# For rays inside the sphere, take t2 (exit point)
t = np.where(t1 > 0, t1, t2)
valid = t > 0
if np.sum(valid) == 0:
return RecordedRays(
positions=np.empty((0, 3), dtype=np.float32),
directions=np.empty((0, 3), dtype=np.float32),
times=np.empty(0, dtype=np.float32),
intensities=np.empty(0, dtype=np.float32),
wavelengths=np.empty(0, dtype=np.float32),
generations=np.empty(0, dtype=np.int32),
)
# Extract hit rays
final_mask = np.zeros(len(hit_mask), dtype=bool)
final_mask[np.where(hit_mask)[0][valid]] = True
hit_positions = rays.positions[final_mask]
hit_directions = rays.directions[final_mask]
hit_intensities = rays.intensities[final_mask]
hit_times = rays.accumulated_time[final_mask]
hit_wavelengths = rays.wavelengths[final_mask]
hit_generations = rays.generations[final_mask]
t_final = t[valid]
# Calculate intersection positions
intersection_positions = hit_positions + t_final[:, np.newaxis] * hit_directions
# Update times (distance / speed of light)
c_light = 299792458.0 # m/s
arrival_times = hit_times + (t_final / c_light).astype(np.float32)
# Get polarization vectors if available
hit_polarization_vectors = None
if rays.polarization_vector is not None:
hit_polarization_vectors = rays.polarization_vector[final_mask]
return RecordedRays(
positions=intersection_positions.astype(np.float32),
directions=hit_directions.astype(np.float32),
times=arrival_times,
intensities=hit_intensities,
wavelengths=hit_wavelengths,
generations=hit_generations,
polarization_vectors=hit_polarization_vectors,
)