Source code for ome_arrow.transform

"""
Module for transforming OME-Arrow data
(e.g., slices, projections, or other changes).
"""

from typing import Any, Dict, Iterable, List, Optional, Tuple

import numpy as np
import pyarrow as pa

from ome_arrow.ingest import _build_chunks_from_planes, _normalize_chunk_shape
from ome_arrow.meta import OME_ARROW_STRUCT


[docs] def slice_ome_arrow( data: Dict[str, Any] | pa.StructScalar, x_min: int, x_max: int, y_min: int, y_max: int, t_indices: Optional[Iterable[int]] = None, c_indices: Optional[Iterable[int]] = None, z_indices: Optional[Iterable[int]] = None, fill_missing: bool = True, ) -> pa.StructScalar: """ Create a cropped copy of an OME-Arrow record. Crops spatially to [y_min:y_max, x_min:x_max] (half-open) and, if provided, filters/reindexes T/C/Z to the given index sets. Parameters ---------- data : dict | pa.StructScalar OME-Arrow record. x_min, x_max, y_min, y_max : int Half-open crop bounds in pixels (0-based). t_indices, c_indices, z_indices : Iterable[int] | None Optional explicit indices to keep for T, C, Z. If None, keep all. Selected indices are reindexed to 0..len-1 in the output. fill_missing : bool If True, any missing (t,c,z) planes in the selection are zero-filled. Returns ------- pa.StructScalar New OME-Arrow record with updated sizes and planes. """ # Unwrap to dict row = data.as_py() if isinstance(data, pa.StructScalar) else dict(data) pm = dict(row.get("pixels_meta", {})) sx = int(pm.get("size_x", 1)) sy = int(pm.get("size_y", 1)) sz = int(pm.get("size_z", 1)) sc = int(pm.get("size_c", 1)) st = int(pm.get("size_t", 1)) if not (0 <= x_min < x_max <= sx and 0 <= y_min < y_max <= sy): raise ValueError( f"Crop bounds out of range: x[{x_min},{x_max}) within [0,{sx}), " f"y[{y_min},{y_max}) within [0,{sy})." ) # Normalize T/C/Z selections (keep all if None) def _norm(sel: Optional[Iterable[int]], size: int) -> List[int]: return ( list(range(size)) if sel is None else sorted({int(i) for i in sel if 0 <= int(i) < size}) ) keep_t = _norm(t_indices, st) keep_c = _norm(c_indices, sc) keep_z = _norm(z_indices, sz) if len(keep_t) == 0 or len(keep_c) == 0 or len(keep_z) == 0: raise ValueError("Selection must keep at least one index in each of T/C/Z.") # Reindex maps (old -> new) t_map = {t: i for i, t in enumerate(keep_t)} c_map = {c: i for i, c in enumerate(keep_c)} z_map = {z: i for i, z in enumerate(keep_z)} new_sx = x_max - x_min new_sy = y_max - y_min new_st = len(keep_t) new_sc = len(keep_c) new_sz = len(keep_z) # Fast access to incoming planes planes_in: List[Dict[str, Any]] = list(row.get("planes", [])) if not planes_in: raise ValueError("Record contains no planes to slice.") # Group incoming planes by (t,c,z) by_tcz: Dict[Tuple[int, int, int], Dict[str, Any]] = {} for p in planes_in: tt = int(p["t"]) cc = int(p["c"]) zz = int(p["z"]) by_tcz[(tt, cc, zz)] = p # Helper to crop one plane expected_len = sx * sy def _crop_pixels(flat: Iterable[int]) -> List[int]: arr = np.asarray(flat) if arr.size != expected_len: # be strict: malformed plane raise ValueError(f"Plane has {arr.size} pixels; expected {expected_len}.") arr = arr.reshape(sy, sx) sub = arr[y_min:y_max, x_min:x_max] return sub.ravel().astype(arr.dtype, copy=False).tolist() # Build new plane list in dense (t,c,z) order using selections planes_out: List[Dict[str, Any]] = [] for tt in keep_t: for cc in keep_c: for zz in keep_z: src = by_tcz.get((tt, cc, zz)) if src is None: if not fill_missing: continue # zero-fill missing plane planes_out.append( { "t": t_map[tt], "c": c_map[cc], "z": z_map[zz], "pixels": [0] * (new_sx * new_sy), } ) else: cropped = _crop_pixels(src["pixels"]) planes_out.append( { "t": t_map[tt], "c": c_map[cc], "z": z_map[zz], "pixels": cropped, } ) # Filter channel metadata to kept channels and reindex channels_in = list(pm.get("channels", []) or []) channels_out: List[Dict[str, Any]] = [] # If channels metadata length mismatches, synthesize minimal entries if len(channels_in) != sc: channels_in = [ {"id": f"ch-{i}", "name": f"C{i}", "color_rgba": 0xFFFFFFFF} for i in range(sc) ] for old_c in keep_c: meta = dict(channels_in[old_c]) meta["id"] = f"ch-{c_map[old_c]}" # ensure name string if "name" in meta: meta["name"] = str(meta["name"]) else: meta["name"] = f"C{c_map[old_c]}" channels_out.append(meta) # Update pixels_meta pm_out = dict(pm) pm_out.update( { "size_x": new_sx, "size_y": new_sy, "size_z": new_sz, "size_c": new_sc, "size_t": new_st, "channels": channels_out, } ) # If dimension order encoded XYCT/XYZCT etc., keep it as-is (no axis permutation). # (Optional: you could normalize to XYCT if new_sz==1, else XYZCT.) # Assemble new record rec_out = dict(row) rec_out["pixels_meta"] = pm_out rec_out["planes"] = planes_out chunk_grid_in = row.get("chunk_grid") or {} if chunk_grid_in or row.get("chunks"): chunk_shape = ( int(chunk_grid_in.get("chunk_z", 1)), int(chunk_grid_in.get("chunk_y", 512)), int(chunk_grid_in.get("chunk_x", 512)), ) chunk_order = str(chunk_grid_in.get("chunk_order") or "ZYX") chunks_out = _build_chunks_from_planes( planes=planes_out, size_t=new_st, size_c=new_sc, size_z=new_sz, size_y=new_sy, size_x=new_sx, chunk_shape=chunk_shape, chunk_order=chunk_order, ) cz, cy, cx = _normalize_chunk_shape(chunk_shape, new_sz, new_sy, new_sx) rec_out["chunk_grid"] = { "order": "TCZYX", "chunk_t": 1, "chunk_c": 1, "chunk_z": cz, "chunk_y": cy, "chunk_x": cx, "chunk_order": chunk_order, } rec_out["chunks"] = chunks_out else: rec_out["chunk_grid"] = row.get("chunk_grid") rec_out["chunks"] = row.get("chunks") return pa.scalar(rec_out, type=OME_ARROW_STRUCT)