import math
from typing import Optional, TypedDict
import cupy as cp
import cupy.typing as cpt
from numba import cuda
import numpy as np
import numpy.typing as npt
from ..consts import detector, ff_induction
# Type definitions
# ((x_min, x_max), (y_min, y_max), (z_min, z_max))
Bounds = tuple[tuple[float, float], tuple[float, float], tuple[float, float]]
[docs]
class VoxelDict(TypedDict):
x: Optional[cpt.NDArray[cp.float32]]
y: Optional[cpt.NDArray[cp.float32]]
z: Optional[cpt.NDArray[cp.float32]]
q: Optional[cpt.NDArray[cp.float32]]
[docs]
def voxel_id_to_coordinates(
voxel_indices: cpt.NDArray[cp.int32],
grid_shape: tuple[int, int, int],
voxel_size: tuple[float, float, float],
bounds: Bounds
):
"""
Convert flattened voxel indices to physical (x, y, z) center coordinates.
Args:
voxel_indices: 1D array of flattened voxel indices
grid_shape: (nx, ny, nz) tuple
voxel_size: (dx, dy, dz) tuple in cm
bounds: ((x_min, x_max), (y_min, y_max), (z_min, z_max)) in cm
Returns:
(x_centers, y_centers, z_centers): NumPy arrays of voxel center coordinates in cm
"""
# Convert CuPy to NumPy if needed
if hasattr(voxel_indices, 'get'):
voxel_indices = cp.asnumpy(voxel_indices)
nx, ny, _ = grid_shape
x_min, y_min, z_min = bounds[0][0], bounds[1][0], bounds[2][0]
dx, dy, dz = voxel_size
# Decode flattened indices
iz = voxel_indices // (nx * ny)
rem = voxel_indices % (nx * ny)
iy = rem // nx
ix = rem % nx
# Compute voxel center coordinates
x_centers = x_min + (ix + 0.5) * dx
y_centers = y_min + (iy + 0.5) * dy
z_centers = z_min + (iz + 0.5) * dz
return x_centers, y_centers, z_centers
[docs]
def gpu_voxelize(
tracks: cpt.NDArray,
tpc_borders: Optional[npt.NDArray]=None,
voxel_size: Optional[float]=None,
z_padding: float=0.0
) -> tuple[cpt.NDArray[cp.int32], cpt.NDArray[cp.float32],
tuple[float, float, float], float, Bounds]:
"""
Voxelize track segments on GPU.
Args:
tracks: structured track array (fields: x_start, y_start, z_start,
x_end, y_end, z_end, n_electrons, pixel_plane, ...)
tpc_borders: TPC borders override (optional; defaults to
detector.TPC_BORDERS)
voxel_size: Voxel size override (optional; defaults to using voxel size
from ff_induction)
z_padding: Extra padding on the range of drift coordinates to cover
Returns:
(voxel_indices, voxel_charges, grid_shape, voxel_size, bounds) where
voxel_indices & voxel_charges are 1D CuPy arrays for non-zero voxels.
"""
if tpc_borders is None:
tpc_borders = detector.TPC_BORDERS
if voxel_size is None:
voxel_size = (
ff_induction.COARSE_VOXEL_SIZE_X,
ff_induction.COARSE_VOXEL_SIZE_Y,
ff_induction.COARSE_VOXEL_SIZE_Z,
)
x_min = float(np.min(tpc_borders[:,0,0])); x_max = float(np.max(tpc_borders[:,0,1]))
y_min = float(np.min(tpc_borders[:,1,0])); y_max = float(np.max(tpc_borders[:,1,1]))
if len(tracks) > 0:
z_min_tracks = float(min(tracks['z_start'].min(), tracks['z_end'].min()))
z_max_tracks = float(max(tracks['z_start'].max(), tracks['z_end'].max()))
z_min = min(z_min_tracks, float(np.min(tpc_borders[:,2,0]))) - z_padding
z_max = max(z_max_tracks, float(np.max(tpc_borders[:,2,1]))) + z_padding
else:
z_min = float(np.min(tpc_borders[:,2,0]))
z_max = float(np.max(tpc_borders[:,2,1]))
bounds = ((x_min,x_max),(y_min,y_max),(z_min,z_max))
grid_shape = (
int(np.ceil((x_max - x_min)/voxel_size[0])),
int(np.ceil((y_max - y_min)/voxel_size[1])),
int(np.ceil((z_max - z_min)/voxel_size[2])),
)
total_voxels = grid_shape[0]*grid_shape[1]*grid_shape[2]
voxel_charges = cp.zeros(total_voxels, dtype=cp.float32)
TPB = 256
BPG = (tracks.shape[0] + TPB - 1)//TPB
voxelize_segments_kernel[BPG, TPB](tracks, voxel_charges,
cp.asarray(voxel_size, dtype=cp.float32),
cp.asarray(bounds, dtype=cp.float32),
cp.asarray(grid_shape, dtype=cp.int32))
# Transfer to CPU for numpy operation
voxel_charges_np = cp.asnumpy(voxel_charges)
nonzero_indices = np.where(voxel_charges_np > 0)[0]
# Return as cupy arrays
return cp.asarray(nonzero_indices), cp.asarray(voxel_charges_np[nonzero_indices]), grid_shape, voxel_size, bounds
@cuda.jit
def voxelize_segments_kernel(
tracks: cpt.NDArray,
voxel_charges: cpt.NDArray[cp.float32],
voxel_size: cpt.NDArray[cp.float32],
bounds: cp.ndarray[tuple[int, int], cp.float32],
grid_shape: cpt.NDArray[cp.int32]
):
"""
CUDA kernel for fast voxelization of track segments.
Parallelizes the voxelization process across segments.
Args:
tracks: structured track array (fields: x_start, y_start, z_start,
x_end, y_end, z_end, n_electrons, pixel_plane, ...)
voxel_charges: (n_voxels,) output array for voxel charges (flattened 3D grid)
voxel_size: (dx, dy, dt) voxel dimensions
bounds: ((x_min, x_max), (y_min, y_max), (z_min, z_max))
grid_shape: (nx, ny, nt) grid dimensions
"""
i = cuda.grid(1)
if i >= tracks.shape[0]:
return
segment = tracks[i]
# Extract from structured array using field names (Numba handles this)
x_start = segment['x_start']; y_start = segment['y_start']; z_start = segment['z_start']
x_end = segment['x_end']; y_end = segment['y_end']; z_end = segment['z_end']
n_electrons = segment['n_electrons']
# Convert to voxel indices
ix_start = int((x_start - bounds[0][0]) / voxel_size[0])
iy_start = int((y_start - bounds[1][0]) / voxel_size[1])
iz_start = int((z_start - bounds[2][0]) / voxel_size[2])
ix_end = int((x_end - bounds[0][0]) / voxel_size[0])
iy_end = int((y_end - bounds[1][0]) / voxel_size[1])
iz_end = int((z_end - bounds[2][0]) / voxel_size[2])
# Clip to grid bounds
ix_start = max(0, min(grid_shape[0]-1, ix_start))
iy_start = max(0, min(grid_shape[1]-1, iy_start))
iz_start = max(0, min(grid_shape[2]-1, iz_start))
ix_end = max(0, min(grid_shape[0]-1, ix_end))
iy_end = max(0, min(grid_shape[1]-1, iy_end))
iz_end = max(0, min(grid_shape[2]-1, iz_end))
# 3D DDA
# Compute segment direction and length
dx = x_end - x_start
dy = y_end - y_start
dz = z_end - z_start
seg_len = math.sqrt(dx*dx + dy*dy + dz*dz)
if seg_len == 0:
# Zero-length segment, deposit all charge in start voxel
idx = ix_start + iy_start * grid_shape[0] + iz_start * grid_shape[0] * grid_shape[1]
cuda.atomic.add(voxel_charges, idx, n_electrons)
return
# Parametrize segment:
# p(t) = (x_start, y_start, z_start) + t*(dx, dy, dz), t in [0,1]
# Voxel grid bounds
nx, ny, nz = grid_shape[0], grid_shape[1], grid_shape[2]
x0, y0, z0 = bounds[0][0], bounds[1][0], bounds[2][0]
sx, sy, sz = voxel_size[0], voxel_size[1], voxel_size[2]
# Initial voxel indices
ix = ix_start; iy = iy_start; iz = iz_start
# Step direction
step_x = 1 if dx > 0 else -1 if dx < 0 else 0
step_y = 1 if dy > 0 else -1 if dy < 0 else 0
step_z = 1 if dz > 0 else -1 if dz < 0 else 0
# Compute tMax and tDelta for DDA
if step_x != 0:
next_x = x0 + (ix + (step_x > 0)) * sx
tMaxX = (next_x - x_start) / dx if dx != 0 else math.inf
tDeltaX = sx / abs(dx)
else:
tMaxX = math.inf
tDeltaX = math.inf
if step_y != 0:
next_y = y0 + (iy + (step_y > 0)) * sy
tMaxY = (next_y - y_start) / dy if dy != 0 else math.inf
tDeltaY = sy / abs(dy)
else:
tMaxY = math.inf
tDeltaY = math.inf
if step_z != 0:
next_z = z0 + (iz + (step_z > 0)) * sz
tMaxZ = (next_z - z_start) / dz if dz != 0 else math.inf
tDeltaZ = sz / abs(dz)
else:
tMaxZ = math.inf
tDeltaZ = math.inf
t = 0.0
t_end = 1.0
# Traverse voxels
while (0 <= ix < nx) and (0 <= iy < ny) and (0 <= iz < nz):
# Find next voxel boundary crossed
if tMaxX < tMaxY and tMaxX < tMaxZ:
t_next = min(tMaxX, t_end)
elif tMaxY < tMaxZ:
t_next = min(tMaxY, t_end)
else:
t_next = min(tMaxZ, t_end)
# Fraction of segment in this voxel
frac = t_next - t
if frac > 0:
charge = n_electrons * frac
idx = ix + iy * nx + iz * nx * ny
cuda.atomic.add(voxel_charges, idx, charge)
if t_next >= t_end:
break
# Advance to next voxel
if tMaxX < tMaxY and tMaxX < tMaxZ:
ix += step_x
t = tMaxX
tMaxX += tDeltaX
elif tMaxY < tMaxZ:
iy += step_y
t = tMaxY
tMaxY += tDeltaY
else:
iz += step_z
t = tMaxZ
tMaxZ += tDeltaZ
[docs]
def get_voxel_cache(tracks: cpt.NDArray) -> dict[int, VoxelDict]:
"""Get cache of voxels for each TPC.
Args:
tracks: structured track array (fields: x_start, y_start, z_start,
x_end, y_end, z_end, n_electrons, pixel_plane, ...)
Returns:
Dict from TPC index to a VoxelDict
"""
cache = {}
active_tpcs = np.unique(tracks['pixel_plane'].astype(np.int32))
for tpc_idx in active_tpcs:
cache[int(tpc_idx)] = {"x": None, "y": None, "z": None, "q": None}
tpc_tracks = tracks[tracks['pixel_plane'] == tpc_idx]
if len(tpc_tracks) == 0:
continue
indices, charges, grid_shape, voxel_size, bounds = gpu_voxelize(
tpc_tracks, tpc_borders=detector.TPC_BORDERS[[tpc_idx]])
if len(indices) == 0:
continue
vx, vy, vz = voxel_id_to_coordinates(indices, grid_shape, voxel_size, bounds)
cache[int(tpc_idx)] = {
"x": cp.asarray(vx, dtype=cp.float32),
"y": cp.asarray(vy, dtype=cp.float32),
"z": cp.asarray(vz, dtype=cp.float32),
"q": cp.asarray(charges, dtype=cp.float32),
}
return cache