Source code for larndsim.far_field.pixel_classifier

"""
Classifies pixels into physics categories (CHARGE_COLLECTION, CHARGE_NEIGHBOR,
INDUCTION_ONLY, INACTIVE) based on:

- Perpendicular distance from segment line to pixel surface (point-to-box distance)
- Expected diffusion spread (:math:`\sigma_T = \sqrt{2 * D_T * t_\mathrm{drift}}`)
- Far-field induction signal strength estimate

Example workflow:
    # Classify pixels by physics regime
    # For each pixel, all segments are processed to determine if
    # it's in CHARGE_COLLECTION, CHARGE_NEIGHBOR, INDUCTION_ONLY, 
    # or INACTIVE 
    result = classify_pixels(segments, plane_id=0)
    
    # Result contains pixel IDs for each category
    charge_pixel_ids = result.charge_pixels  # CuPy array on GPU
    neighbor_pixel_ids = result.neighbor_pixels
    induction_pixel_ids = result.induction_pixels
"""

import math

from numba import cuda
import numpy as np
import numpy.typing as npt

import cupy as cp
import cupy.typing as cpt

from . import (
    PixelCategory,
    PixelClassificationResult,
)
from ..consts import detector, ff_induction


[docs] def generate_pixel_coordinates(plane_id: int) \ -> tuple[npt.NDArray[np.float32], npt.NDArray[np.float32], npt.NDArray[np.int64]]: """ Generate all pixel center coordinates for a given TPC plane. Uses the same grid structure as pixels_from_track.py for consistency. This ensures that pixel IDs from pixel2id() correspond to the correct physical positions. Args: plane_id: TPC plane index Returns: tuple: (pixel_coords_x, pixel_coords_y, pixel_ids) - pixel_coords_x: (n_pixels,) array of x-coordinates in cm - pixel_coords_y: (n_pixels,) array of y-coordinates in cm - pixel_ids: (n_pixels,) array of unique pixel IDs from pixel2id() """ tpc_border = detector.TPC_BORDERS[plane_id] # Use full-plane index ranges from detector n_pixels_x = detector.N_PIXELS[0] n_pixels_y = detector.N_PIXELS[1] px_indices = np.arange(n_pixels_x, dtype=np.int32) py_indices = np.arange(n_pixels_y, dtype=np.int32) # Create grid px_grid, py_grid = np.meshgrid(px_indices, py_indices, indexing='ij') px_flat = px_grid.ravel() py_flat = py_grid.ravel() # Compute physical coordinates using the global pixel grid convention pixel_coords_x = (tpc_border[0][0] + (px_flat + 0.5) * detector.PIXEL_PITCH).astype(np.float32) pixel_coords_y = (tpc_border[1][0] + (py_flat + 0.5) * detector.PIXEL_PITCH).astype(np.float32) # Compute pixel IDs using the global indexing pixel_ids = (px_flat + detector.N_PIXELS[0] * (py_flat + detector.N_PIXELS[1] * plane_id)).astype(np.int32) return pixel_coords_x, pixel_coords_y, pixel_ids
@cuda.jit def classify_pixels_kernel( segments: cpt.NDArray, pixel_coords_x: cpt.NDArray[cp.float32], pixel_coords_y: cpt.NDArray[cp.float32], categories: cpt.NDArray[cp.int32], z_anode: float, ): """ CUDA kernel to classify each pixel by proximity to segments. For each pixel: 1. Loop over all segments 2. Calculate perpendicular distance from segment to pixel surface (point-to-box) 3. Estimate diffusion spread: :math:`\sigma_T = \sqrt{2 * D_T * t_\mathrm{drift}}` 4. Check proximity thresholds including diffusion spread 5. Assign highest-priority category (CHARGE_COLLECTION > CHARGE_NEIGHBOR > INDUCTION_ONLY) Args: segments: ``(n_segments,)`` structured array with fields ``x_start``, ``y_start``, ``z_start``, ``x_end``, ``y_end``, ``z_end`` (cm), ``n_electrons`` (float), and ``pixel_plane`` (int). pixel_coords_x: (n_pixels,) pixel center x positions (cm) pixel_coords_y: (n_pixels,) pixel center y positions (cm) categories: (n_pixels,) output array of PixelCategory enum values z_anode: anode z-position for this plane (cm) """ i_pixel = cuda.grid(1) if i_pixel >= categories.shape[0]: return px = pixel_coords_x[i_pixel] py = pixel_coords_y[i_pixel] # Plane is fixed per-kernel launch; z_anode provided by host # Start with INACTIVE, upgrade as we find relevant segments pixel_category = PixelCategory.INACTIVE # Accumulate induction signals from all segments # This handles high-multiplicity events where many small contributions add up total_induction_signal = 0.0 # Loop over all segments for i_seg in range(segments.shape[0]): seg = segments[i_seg] # Segments are pre-filtered by plane on host # Skip if segment has no charge if seg["n_electrons"] <= 0: continue # Segment endpoints in x-y plane x_start = seg["x_start"] y_start = seg["y_start"] x_end = seg["x_end"] y_end = seg["y_end"] # Vector v from start to end (projected onto x-y plane) vx = x_end - x_start vy = y_end - y_start seg_length_xy = math.sqrt(vx * vx + vy * vy) # Calculate perpendicular distance from pixel to segment # Use point-to-box distance: measure distance to closest point on pixel square # Pixel box boundaries: [px - half_pitch, px + half_pitch] x [py - half_pitch, py + half_pitch] half_pitch = detector.PIXEL_PITCH * 0.5 if seg_length_xy < 1e-6: # Vertical segment (no x-y extent) - use distance to start point # Clamp segment start point to pixel box boundary x_clamped = x_start if x_clamped < px - half_pitch: x_clamped = px - half_pitch elif x_clamped > px + half_pitch: x_clamped = px + half_pitch y_clamped = y_start if y_clamped < py - half_pitch: y_clamped = py - half_pitch elif y_clamped > py + half_pitch: y_clamped = py + half_pitch dx = x_start - x_clamped dy = y_start - y_clamped r_trans = math.sqrt(dx * dx + dy * dy) # Choose midpoint in z for vertical segment seg_z_closest = 0.5 * (seg["z_start"] + seg["z_end"]) # representative z else: # Non-vertical segment: find closest point on line segment to pixel box # First, find closest point on infinite line to pixel center dx_to_pixel = px - x_start dy_to_pixel = py - y_start # Project pixel center onto segment line: t = dot(pixel-start, v) / |v|^2 t = (dx_to_pixel * vx + dy_to_pixel * vy) / (seg_length_xy * seg_length_xy) # Clamp t to [0, 1] to stay on segment if t < 0.0: t = 0.0 elif t > 1.0: t = 1.0 # Closest point on segment to pixel center x_closest = x_start + t * vx y_closest = y_start + t * vy seg_z_closest = seg["z_start"] + t * (seg["z_end"] - seg["z_start"]) # Now clamp this point to the pixel box boundary # This gives us the closest point on the pixel surface to the segment x_clamped = x_closest if x_clamped < px - half_pitch: x_clamped = px - half_pitch elif x_clamped > px + half_pitch: x_clamped = px + half_pitch y_clamped = y_closest if y_clamped < py - half_pitch: y_clamped = py - half_pitch elif y_clamped > py + half_pitch: y_clamped = py + half_pitch # Distance from segment's closest point to pixel box boundary dx = x_closest - x_clamped dy = y_closest - y_clamped r_trans = math.sqrt(dx * dx + dy * dy) # Estimate drift distance and time # Note: edep-sim segments can have start/end in any order along z # We need to consider the drift time range for the segment # z_anode passed from host; represents anode position for this plane # For classification, use drift time at closest point on segment drift_distance_closest = abs(seg_z_closest - z_anode) # Also compute drift time range for the entire segment # This accounts for segments that span a large z-range drift_distance_start = abs(seg["z_start"] - z_anode) drift_distance_end = abs(seg["z_end"] - z_anode) drift_distance_max = max(drift_distance_start, drift_distance_end) # Use maximum drift time for diffusion estimate (most conservative) # This ensures we don't under-classify pixels that see diffused charge # from the farthest part of the segment t_drift = drift_distance_max / detector.V_DRIFT # μs # Transverse diffusion spread sigma_T = math.sqrt(2.0 * detector.TRAN_DIFF * t_drift) # Effective radius including diffusion thresholds_cc = ff_induction.CHARGE_COLLECTION_RADIUS * detector.PIXEL_PITCH r_eff_cc = thresholds_cc + detector.DIFF_N_SIGMAS * sigma_T thresholds_neighbor = ff_induction.CHARGE_NEIGHBOR_RADIUS * detector.PIXEL_PITCH r_eff_neighbor = thresholds_neighbor + detector.DIFF_N_SIGMAS * sigma_T # Check charge collection first (highest priority) if r_trans <= r_eff_cc: pixel_category = PixelCategory.CHARGE_COLLECTION break # Can't get higher priority, stop checking # Check charge neighbor if r_trans <= r_eff_neighbor: # Upgrade if we're currently INACTIVE or INDUCTION_ONLY if pixel_category < PixelCategory.CHARGE_NEIGHBOR: pixel_category = PixelCategory.CHARGE_NEIGHBOR continue # Accumulate induction signal (lowest priority) # We accumulate from all segments, then check threshold at the end if r_trans <= ff_induction.INDUCTION_CUTOFF_RADIUS: # 3D distance: include drift distance at closest point r3d = math.sqrt(r_trans * r_trans + drift_distance_closest * drift_distance_closest) if r3d > 0.05: # avoid singularity # Normalization: characteristic scale remains PIXEL_PITCH r_normalized = r3d / detector.PIXEL_PITCH signal_estimate = seg["n_electrons"] / (r_normalized * r_normalized) total_induction_signal += signal_estimate # After checking all segments, apply induction classification if threshold met # Only upgrade to INDUCTION_ONLY if we're currently INACTIVE if total_induction_signal >= ff_induction.INDUCTION_SIGNAL_THRESHOLD: if pixel_category == PixelCategory.INACTIVE: pixel_category = PixelCategory.INDUCTION_ONLY # Write final category categories[i_pixel] = pixel_category
[docs] def classify_pixels( segments: cpt.NDArray, plane_id: int ) -> PixelClassificationResult: """ Classify pixels into physics regimes based on segment proximity and diffusion. This function is the main entry point for pixel classification. It: 1. Generates pixel coordinates for the specified TPC plane 2. Validates inputs and transfers data to GPU if needed 3. Launches the classification kernel 4. Extracts per-category pixel indices 5. Returns a PixelClassificationResult with categorized pixel lists and pixel IDs Args: segments: ``(n_segments,)`` structured array with fields ``x_start``, ``y_start``, ``z_start``, ``x_end``, ``y_end``, ``z_end`` (cm), ``n_electrons`` (float), and ``pixel_plane`` (int). plane_id: TPC plane identifier. Returns: PixelClassificationResult: Categorized pixel lists and pixel IDs. See :class:`~larndsim.far_field.PixelClassificationResult` for field details. Raises: ValueError: If input arrays have mismatched sizes or invalid data. """ # Generate pixel coordinates for this TPC plane px_np, py_np, pixel_ids = generate_pixel_coordinates(plane_id) n_pixels = len(px_np) # Filter segments to only those on the specified plane plane_mask = segments["pixel_plane"] == plane_id segments_np = segments[plane_mask] if len(segments_np) == 0: # No segments on this plane -> all pixels inactive, return empty arrays return PixelClassificationResult( charge_pixels=cp.array([], dtype=np.int32), neighbor_pixels=cp.array([], dtype=np.int32), induction_pixels=cp.array([], dtype=np.int32), induction_pixels_x=cp.array([], dtype=np.float32), induction_pixels_y=cp.array([], dtype=np.float32), ) # Move data to device memory for kernel consumption segments_dev = cuda.to_device(segments_np) px_dev = cuda.to_device(px_np.astype(np.float32)) py_dev = cuda.to_device(py_np.astype(np.float32)) categories_dev = cuda.device_array(n_pixels, dtype=np.int32) # Launch kernel (1D grid) threads_per_block = 256 blocks = (n_pixels + threads_per_block - 1) // threads_per_block # Determine z_anode on host and pass as scalar to kernel z_anode = float(detector.TPC_BORDERS[int(plane_id)][2][0]) classify_pixels_kernel[blocks, threads_per_block]( segments_dev, px_dev, py_dev, categories_dev, z_anode, ) # Copy back categories to host categories_host = categories_dev.copy_to_host() # Build masks and index arrays (NumPy first) charge_mask_h = categories_host == int(PixelCategory.CHARGE_COLLECTION) neighbor_mask_h = categories_host == int(PixelCategory.CHARGE_NEIGHBOR) induction_mask_h = categories_host == int(PixelCategory.INDUCTION_ONLY) charge_pixels_h = np.where(charge_mask_h)[0].astype(np.int32) neighbor_pixels_h = np.where(neighbor_mask_h)[0].astype(np.int32) induction_pixels_h = np.where(induction_mask_h)[0].astype(np.int32) # Map indices to pixel IDs charge_pixel_ids_h = pixel_ids[charge_pixels_h] neighbor_pixel_ids_h = pixel_ids[neighbor_pixels_h] induction_pixel_ids_h = pixel_ids[induction_pixels_h] # Coordinates for induction-only pixels (NumPy) induction_pixels_x_h = px_np[induction_pixels_h].astype(np.float32) induction_pixels_y_h = py_np[induction_pixels_h].astype(np.float32) # Convert outputs to CuPy arrays for downstream GPU use charge_pixel_ids_gpu = cp.asarray(charge_pixel_ids_h) neighbor_pixel_ids_gpu = cp.asarray(neighbor_pixel_ids_h) induction_pixel_ids_gpu = cp.asarray(induction_pixel_ids_h) induction_pixels_x_gpu = cp.asarray(induction_pixels_x_h) induction_pixels_y_gpu = cp.asarray(induction_pixels_y_h) # Return pixel IDs and coordinates for the categories return PixelClassificationResult( charge_pixels=charge_pixel_ids_gpu, neighbor_pixels=neighbor_pixel_ids_gpu, induction_pixels=induction_pixel_ids_gpu, induction_pixels_x=induction_pixels_x_gpu, induction_pixels_y=induction_pixels_y_gpu, )
[docs] def get_classification_cache(segments: cpt.NDArray) \ -> dict[int, PixelClassificationResult]: """Get cache of pixel classes for all active TPCs. Args: segments: ``(n_segments,)`` structured array with fields ``x_start``, ``y_start``, ``z_start``, ``x_end``, ``y_end``, ``z_end`` (cm), ``n_electrons`` (float), and ``pixel_plane`` (int). Returns: Dict from TPC index to PixelClassificationResult """ cache = {} active_tpcs = np.unique(segments['pixel_plane'].astype(np.int32)) segments_cpu = cp.asnumpy(segments) for tpc_idx in active_tpcs: cache[int(tpc_idx)] = classify_pixels(segments_cpu, plane_id=int(tpc_idx)) return cache