Source code for ome_arrow.export

"""
Module for exporting OME-Arrow data to other formats.
"""

from typing import Any, Dict, List, Optional, Sequence, Tuple

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

from ome_arrow.meta import OME_ARROW_STRUCT, OME_ARROW_TAG_TYPE, OME_ARROW_TAG_VERSION


[docs] def to_numpy( data: Dict[str, Any] | pa.StructScalar, dtype: np.dtype = np.uint16, strict: bool = True, clamp: bool = False, ) -> np.ndarray: """ Convert an OME-Arrow record into a NumPy array shaped (T,C,Z,Y,X). The OME-Arrow "planes" are flattened YX slices indexed by (z, t, c). When chunks are present, this function reconstitutes the dense TCZYX array from chunked pixels instead of planes. Args: data: OME-Arrow data as a Python dict or a `pa.StructScalar`. dtype: Output dtype (default: np.uint16). If different from plane values, a cast (and optional clamp) is applied. strict: When True, raise if a plane has wrong pixel length. When False, truncate/pad that plane to the expected length. clamp: If True, clamp values to the valid range of the target dtype before casting. Returns: np.ndarray: Dense array with shape (T, C, Z, Y, X). Raises: KeyError: If required OME-Arrow fields are missing. ValueError: If dimensions are invalid or planes are malformed. Examples: >>> arr = ome_arrow_to_tczyx(my_row) # (T, C, Z, Y, X) >>> arr.shape (1, 2, 1, 512, 512) """ # Unwrap Arrow scalar to plain Python dict if needed. if isinstance(data, pa.StructScalar): data = data.as_py() pm = data["pixels_meta"] sx, sy = int(pm["size_x"]), int(pm["size_y"]) sz, sc, st = int(pm["size_z"]), int(pm["size_c"]), int(pm["size_t"]) if sx <= 0 or sy <= 0 or sz <= 0 or sc <= 0 or st <= 0: raise ValueError("All size_* fields must be positive integers.") expected_plane_len = sx * sy # Prepare target array (T,C,Z,Y,X), zero-filled by default. out = np.zeros((st, sc, sz, sy, sx), dtype=dtype) # Helper: cast (with optional clamp) to the output dtype. if np.issubdtype(dtype, np.integer): info = np.iinfo(dtype) lo, hi = info.min, info.max elif np.issubdtype(dtype, np.floating): lo, hi = -np.inf, np.inf else: # Rare dtypes: no clamping logic; rely on astype. lo, hi = -np.inf, np.inf def _cast_plane(a: np.ndarray) -> np.ndarray: if clamp: a = np.clip(a, lo, hi) return a.astype(dtype, copy=False) chunks = data.get("chunks") or [] if chunks: chunk_grid = data.get("chunk_grid") or {} chunk_order = str(chunk_grid.get("chunk_order") or "ZYX").upper() if chunk_order != "ZYX": raise ValueError("Only chunk_order='ZYX' is supported for now.") for i, ch in enumerate(chunks): # Chunk coordinates include time/channel plus spatial indices. t = int(ch["t"]) c = int(ch["c"]) z = int(ch["z"]) y = int(ch["y"]) x = int(ch["x"]) # Chunk shape is only spatial (Z, Y, X). shape_z = int(ch["shape_z"]) shape_y = int(ch["shape_y"]) shape_x = int(ch["shape_x"]) # Validate chunk indices and extents within the full 5D array. if not (0 <= t < st and 0 <= c < sc and 0 <= z < sz): raise ValueError( f"chunks[{i}] index out of range: (t,c,z)=({t},{c},{z})" ) if y < 0 or x < 0 or shape_z <= 0 or shape_y <= 0 or shape_x <= 0: raise ValueError(f"chunks[{i}] has invalid shape or origin.") if z + shape_z > sz: raise ValueError( f"chunks[{i}] extent out of range: z+shape_z={z + shape_z} " f"> sz={sz}" ) if y + shape_y > sy: raise ValueError( f"chunks[{i}] extent out of range: y+shape_y={y + shape_y} " f"> sy={sy}" ) if x + shape_x > sx: raise ValueError( f"chunks[{i}] extent out of range: x+shape_x={x + shape_x} " f"> sx={sx}" ) pix = ch["pixels"] try: n = len(pix) except Exception as e: raise ValueError(f"chunks[{i}].pixels is not a sequence") from e expected_len = shape_z * shape_y * shape_x if n != expected_len: if strict: raise ValueError( f"chunks[{i}].pixels length {n} != expected {expected_len}" ) if n > expected_len: pix = pix[:expected_len] else: pix = list(pix) + [0] * (expected_len - n) arr3d = np.asarray(pix).reshape(shape_z, shape_y, shape_x) arr3d = _cast_plane(arr3d) out[t, c, z : z + shape_z, y : y + shape_y, x : x + shape_x] = arr3d return out # Fill planes. for i, p in enumerate(data.get("planes", [])): z = int(p["z"]) t = int(p["t"]) c = int(p["c"]) if not (0 <= z < sz and 0 <= t < st and 0 <= c < sc): raise ValueError(f"planes[{i}] index out of range: (z,t,c)=({z},{t},{c})") pix = p["pixels"] # Ensure sequence-like and correct length. try: n = len(pix) except Exception as e: raise ValueError(f"planes[{i}].pixels is not a sequence") from e if n != expected_plane_len: if strict: raise ValueError( f"planes[{i}].pixels length {n} != size_x*size_y " f"{expected_plane_len}" ) # Lenient mode: fix length by truncation or zero-pad. if n > expected_plane_len: pix = pix[:expected_plane_len] else: pix = list(pix) + [0] * (expected_plane_len - n) # Reshape to (Y,X) and cast. arr2d = np.asarray(pix).reshape(sy, sx) arr2d = _cast_plane(arr2d) out[t, c, z] = arr2d return out
# Note: x/y are implicit because this returns the full XY plane for (t, c, z).
[docs] def plane_from_chunks( data: Dict[str, Any] | pa.StructScalar, *, t: int, c: int, z: int, dtype: np.dtype = np.uint16, strict: bool = True, clamp: bool = False, ) -> np.ndarray: """Extract a single (t, c, z) plane using chunked pixels when available. Args: data: OME-Arrow data as a Python dict or a `pa.StructScalar`. t: Time index for the plane. c: Channel index for the plane. z: Z index for the plane. dtype: Output dtype (default: np.uint16). strict: When True, raise if chunk pixels are malformed. clamp: If True, clamp values to the valid range of the target dtype. Returns: np.ndarray: 2D array with shape (Y, X). Raises: KeyError: If required OME-Arrow fields are missing. ValueError: If indices are out of range or pixels are malformed. """ # The plane spans full X/Y for the given (t, c, z); x/y are implicit. if isinstance(data, pa.StructScalar): data = data.as_py() # Read pixel metadata and validate requested plane indices. pm = data["pixels_meta"] sx, sy = int(pm["size_x"]), int(pm["size_y"]) sz, sc, st = int(pm["size_z"]), int(pm["size_c"]), int(pm["size_t"]) if not (0 <= t < st and 0 <= c < sc and 0 <= z < sz): raise ValueError(f"Requested plane (t={t}, c={c}, z={z}) out of range.") # Prepare dtype conversion (optional clamping for integer outputs). if np.issubdtype(dtype, np.integer): info = np.iinfo(dtype) lo, hi = info.min, info.max elif np.issubdtype(dtype, np.floating): lo, hi = -np.inf, np.inf else: lo, hi = -np.inf, np.inf def _cast_plane(a: np.ndarray) -> np.ndarray: if clamp: a = np.clip(a, lo, hi) return a.astype(dtype, copy=False) # Prefer chunked pixels if present, assembling the requested Z plane. chunks = data.get("chunks") or [] if chunks: chunk_grid = data.get("chunk_grid") or {} chunk_order = str(chunk_grid.get("chunk_order") or "ZYX").upper() if chunk_order != "ZYX": raise ValueError("Only chunk_order='ZYX' is supported for now.") # Allocate an empty XY plane; fill in tiles from matching chunks. plane = np.zeros((sy, sx), dtype=dtype) any_chunk_matched = False for i, ch in enumerate(chunks): # Skip chunks from other (t, c) positions. if int(ch["t"]) != t or int(ch["c"]) != c: continue z0 = int(ch["z"]) szc = int(ch["shape_z"]) # Skip chunks whose Z slab does not cover the target plane. if not (z0 <= z < z0 + szc): continue y0 = int(ch["y"]) x0 = int(ch["x"]) syc = int(ch["shape_y"]) sxc = int(ch["shape_x"]) # Validate chunk bounds (strict mode can fail fast). if z0 < 0 or y0 < 0 or x0 < 0: msg = f"chunks[{i}] has negative origin: (z,y,x)=({z0},{y0},{x0})" if strict: raise ValueError(msg) continue if z0 + szc > sz: msg = f"chunks[{i}] extent out of range: z+shape_z={z0 + szc} > sz={sz}" if strict: raise ValueError(msg) continue if y0 + syc > sy: msg = f"chunks[{i}] extent out of range: y+shape_y={y0 + syc} > sy={sy}" if strict: raise ValueError(msg) continue if x0 + sxc > sx: msg = f"chunks[{i}] extent out of range: x+shape_x={x0 + sxc} > sx={sx}" if strict: raise ValueError(msg) continue pix = ch["pixels"] try: n = len(pix) except Exception as e: raise ValueError(f"chunks[{i}].pixels is not a sequence") from e expected_len = szc * syc * sxc if n != expected_len: if strict: raise ValueError( f"chunks[{i}].pixels length {n} != expected {expected_len}" ) # Lenient mode: truncate or zero-pad to match the expected size. if n > expected_len: pix = pix[:expected_len] else: pix = list(pix) + [0] * (expected_len - n) # Convert to a Z/Y/X slab and copy the requested Z slice into the plane. slab = np.asarray(pix).reshape(szc, syc, sxc) slab = _cast_plane(slab) zi = z - z0 plane[y0 : y0 + syc, x0 : x0 + sxc] = slab[zi] any_chunk_matched = True if any_chunk_matched: return plane # Fallback to planes list if chunks are absent. target = next( ( p for p in data.get("planes", []) if int(p["t"]) == t and int(p["c"]) == c and int(p["z"]) == z ), None, ) if target is None: raise ValueError(f"plane (t={t}, c={c}, z={z}) not found") pix = target["pixels"] try: n = len(pix) except Exception as e: raise ValueError("plane pixels is not a sequence") from e expected_len = sx * sy if n != expected_len: if strict: raise ValueError(f"plane pixels length {n} != size_x*size_y {expected_len}") if n > expected_len: pix = pix[:expected_len] else: pix = list(pix) + [0] * (expected_len - n) arr2d = np.asarray(pix).reshape(sy, sx) return _cast_plane(arr2d)
[docs] def to_ome_tiff( data: Dict[str, Any] | pa.StructScalar, out_path: str, *, dtype: np.dtype = np.uint16, clamp: bool = False, dim_order: str = "TCZYX", compression: Optional[str] = "zlib", # "zlib","lzma","jpegxl", or None compression_level: int = 6, tile: Optional[Tuple[int, int]] = None, # (Y, X) use_channel_colors: bool = False, ) -> None: """ Export an OME-Arrow record to OME-TIFF using BioIO's OmeTiffWriter. Notes ----- - No 'bigtiff' kwarg is passed (invalid for tifffile.TiffWriter.write()). BigTIFF selection is automatic based on file size. """ from ome_arrow.export import to_numpy # your existing function try: from bioio.writers import OmeTiffWriter except Exception: from bioio_ome_tiff.writers import OmeTiffWriter # type: ignore # PhysicalPixelSizes (robust import or shim) try: from bioio import PhysicalPixelSizes # modern bioio except Exception: try: from bioio.types import PhysicalPixelSizes except Exception: try: from aicsimageio.types import PhysicalPixelSizes except Exception: from typing import NamedTuple from typing import Optional as _Opt class PhysicalPixelSizes(NamedTuple): # type: ignore Z: _Opt[float] = None Y: _Opt[float] = None X: _Opt[float] = None # 1) Dense array (T,C,Z,Y,X) arr = to_numpy(data, dtype=dtype, clamp=clamp) # 2) Metadata row = data.as_py() if isinstance(data, pa.StructScalar) else data pm = row["pixels_meta"] _st, sc, _sz, _sy, _sx = arr.shape # Channel names chs: Sequence[Dict[str, Any]] = pm.get("channels", []) or [] channel_names = [f"C{i}" for i in range(sc)] if len(chs) == sc: for i, ch in enumerate(chs): nm = ch.get("name") if nm is not None: channel_names[i] = str(nm) # Optional channel colors (guarded) channel_colors_for_writer = None if use_channel_colors and len(chs) == sc: def _rgba_to_rgb(rgba: int) -> int: r = (rgba >> 24) & 0xFF g = (rgba >> 16) & 0xFF b = (rgba >> 8) & 0xFF return (r << 16) | (g << 8) | b flat_colors: list[int] = [] for ch in chs: rgba = ch.get("color_rgba") flat_colors.append( _rgba_to_rgb(int(rgba)) if isinstance(rgba, int) else 0xFFFFFF ) if len(flat_colors) == sc: channel_colors_for_writer = [flat_colors] # list-per-image # Physical sizes (µm) in Z, Y, X order for BioIO p_dx = float(pm.get("physical_size_x", 1.0) or 1.0) p_dy = float(pm.get("physical_size_y", 1.0) or 1.0) p_dz = float(pm.get("physical_size_z", 1.0) or 1.0) pps_list = [PhysicalPixelSizes(Z=p_dz, Y=p_dy, X=p_dx)] # tifffile passthrough (NO 'bigtiff' here) tifffile_kwargs: Dict[str, Any] = {} if compression is not None: tifffile_kwargs["compression"] = compression if compression == "zlib": tifffile_kwargs["compressionargs"] = {"level": int(compression_level)} if tile is not None: tifffile_kwargs["tile"] = (int(tile[0]), int(tile[1])) # list-per-image payloads data_list = [arr] dim_order_list = [dim_order] image_name_list = [str(row.get("name") or row.get("id") or "image")] ch_names_list = [channel_names] # 3) Write OmeTiffWriter.save( data_list, out_path, dim_order=dim_order_list, image_name=image_name_list, channel_names=ch_names_list, channel_colors=channel_colors_for_writer, # None or [flat list len=sc] physical_pixel_sizes=pps_list, tifffile_kwargs=tifffile_kwargs, )
[docs] def to_ome_zarr( data: Dict[str, Any] | pa.StructScalar, out_path: str, *, dtype: np.dtype = np.uint16, clamp: bool = False, # Axes order for the on-disk array — must match arr shape (T,C,Z,Y,X) dim_order: str = "TCZYX", # NGFF / multiscale multiscale_levels: int = 1, # 1 = no pyramid; >1 builds levels downscale_spatial_by: int = 2, # per-level factor for Z,Y,X zarr_format: int = 3, # 3 (NGFF 0.5) or 2 (NGFF 0.4) # Storage knobs chunks: Optional[Tuple[int, int, int, int, int]] = None, # (T,C,Z,Y,X) or None shards: Optional[Tuple[int, int, int, int, int]] = None, # v3 only, optional compressor: Optional[str] = "zstd", # "zstd","lz4","gzip", or None compressor_level: int = 3, # Optional display metadata (carried through if you later enrich channels/rdefs) image_name: Optional[str] = None, ) -> None: """ Write OME-Zarr using your `OMEZarrWriter` (instance API). - Builds arr as (T,C,Z,Y,X) using your `to_numpy`. - Creates level shapes for a multiscale pyramid (if multiscale_levels>1). - Chooses Blosc codec compatible with zarr_format (v2 vs v3). - Populates axes names/types/units and physical pixel sizes from pixels_meta. - Uses default TCZYX chunks if none are provided. """ # --- local import to avoid hard deps at module import time # Use the class you showed from bioio_ome_zarr.writers import OMEZarrWriter from ome_arrow.export import to_numpy # your existing function # Optional compressors for v2 vs v3 compressor_obj = None if compressor is not None: if zarr_format == 2: # numcodecs Blosc (v2 path) from numcodecs import Blosc as BloscV2 cname = {"zstd": "zstd", "lz4": "lz4", "gzip": "zlib"}.get( compressor, "zstd" ) compressor_obj = BloscV2( cname=cname, clevel=int(compressor_level), shuffle=BloscV2.BITSHUFFLE ) else: # zarr v3 codec from zarr.codecs import BloscCodec, BloscShuffle cname = {"zstd": "zstd", "lz4": "lz4", "gzip": "zlib"}.get( compressor, "zstd" ) compressor_obj = BloscCodec( cname=cname, clevel=int(compressor_level), shuffle=BloscShuffle.bitshuffle, ) # 1) Dense pixel data (T,C,Z,Y,X) arr = to_numpy(data, dtype=dtype, clamp=clamp) # 2) Unwrap OME-Arrow metadata row = data.as_py() if isinstance(data, pa.StructScalar) else data pm = row["pixels_meta"] st, sc, sz, sy, sx = arr.shape # 3) Axis metadata (names/types/units aligned with T,C,Z,Y,X) axes_names = [a.lower() for a in dim_order] # ["t","c","z","y","x"] axes_types = ["time", "channel", "space", "space", "space"] # Units: micrometers for spatial, leave T/C None axes_units = [ None, None, pm.get("physical_size_z_unit") or "µm", pm.get("physical_size_y_unit") or "µm", pm.get("physical_size_x_unit") or "µm", ] # Physical pixel sizes at level 0 in axis order p_dx = float(pm.get("physical_size_x", 1.0) or 1.0) p_dy = float(pm.get("physical_size_y", 1.0) or 1.0) p_dz = float(pm.get("physical_size_z", 1.0) or 1.0) physical_pixel_size = [1.0, 1.0, p_dz, p_dy, p_dx] # T,C,Z,Y,X # 4) Multiscale level shapes (level 0 first). Only spatial dims are downscaled. def _down(a: int, f: int) -> int: return max(1, a // f) def _default_chunks_tcxyz( shape: Tuple[int, int, int, int, int], ) -> Tuple[int, int, int, int, int]: _t, _c, z, y, x = shape cz = min(z, 4) if z > 1 else 1 cy = min(y, 512) cx = min(x, 512) return (1, 1, cz, cy, cx) def _level_shapes_tcxyz(levels: int) -> List[Tuple[int, int, int, int, int]]: shapes = [(st, sc, sz, sy, sx)] for _ in range(levels - 1): t, c, z, y, x = shapes[-1] shapes.append( ( t, c, _down(z, downscale_spatial_by), _down(y, downscale_spatial_by), _down(x, downscale_spatial_by), ) ) return shapes multiscale_levels = max(1, int(multiscale_levels)) level_shapes: List[Tuple[int, int, int, int, int]] = _level_shapes_tcxyz( multiscale_levels ) # 5) Chunking / shards (can be single-shape or per-level; # we pass single-shape if provided) chunk_shape: Optional[List[Tuple[int, ...]]] = None if chunks is None: chunks = _default_chunks_tcxyz((st, sc, sz, sy, sx)) if chunks is not None: chunk_shape = [tuple(int(v) for v in chunks)] * multiscale_levels shard_shape: Optional[List[Tuple[int, ...]]] = None if shards is not None and zarr_format == 3: shard_shape = [tuple(int(v) for v in shards)] * multiscale_levels # 6) Image name default img_name = image_name or str(row.get("name") or row.get("id") or "Image") # 7) Instantiate writer with your class constructor writer = OMEZarrWriter( store=out_path, level_shapes=level_shapes, dtype=dtype, chunk_shape=chunk_shape, shard_shape=shard_shape, compressor=compressor_obj, zarr_format=3 if int(zarr_format) == 3 else 2, image_name=img_name, channels=None, # you can map your channel metadata here later rdefs=None, # optional OMERO display metadata creator_info=None, # optional "creator" block root_transform=None, # optional NGFF root transform axes_names=axes_names, axes_types=axes_types, axes_units=axes_units, physical_pixel_size=physical_pixel_size, ) # 8) Write full-resolution; writer will build & fill lower levels writer.write_full_volume(arr)
[docs] def to_ome_parquet( data: Dict[str, Any] | pa.StructScalar, out_path: str, column_name: str = "image", file_metadata: Optional[Dict[str, str]] = None, compression: Optional[str] = "zstd", row_group_size: Optional[int] = None, ) -> None: """ Export an OME-Arrow record to a Parquet file as a single-row, single-column table. The single column holds a struct with the OME-Arrow schema. """ # 1) Normalize to a plain Python dict (works better with pyarrow builders, # especially when the struct has a `null`-typed field like "masks"). if isinstance(data, pa.StructScalar): record_dict = data.as_py() else: # Validate by round-tripping through a typed scalar, then back to dict. record_dict = {f.name: data.get(f.name) for f in OME_ARROW_STRUCT} record_dict = pa.scalar(record_dict, type=OME_ARROW_STRUCT).as_py() # 2) Build a single-row struct array from the dict, explicitly passing the schema struct_array = pa.array([record_dict], type=OME_ARROW_STRUCT) # len=1 # 3) Wrap into a one-column table table = pa.table({column_name: struct_array}) # 4) Attach optional file-level metadata meta: Dict[bytes, bytes] = dict(table.schema.metadata or {}) try: meta[b"ome.arrow.type"] = str(OME_ARROW_TAG_TYPE).encode("utf-8") meta[b"ome.arrow.version"] = str(OME_ARROW_TAG_VERSION).encode("utf-8") except Exception: pass if file_metadata: for k, v in file_metadata.items(): meta[str(k).encode("utf-8")] = str(v).encode("utf-8") table = table.replace_schema_metadata(meta) # 5) Write Parquet (single row, single column) pq.write_table( table, out_path, compression=compression, row_group_size=row_group_size, )
[docs] def to_ome_vortex( data: Dict[str, Any] | pa.StructScalar, out_path: str, column_name: str = "image", file_metadata: Optional[Dict[str, str]] = None, ) -> None: """Export an OME-Arrow record to a Vortex file. The file is written as a single-row, single-column Arrow table where the column holds a struct with the OME-Arrow schema. Args: data: OME-Arrow dict or StructScalar. out_path: Output path for the Vortex file. column_name: Column name to store the struct. file_metadata: Optional file-level metadata to attach. Raises: ImportError: If the optional `vortex-data` dependency is missing. """ try: import vortex.io as vxio except ImportError as exc: raise ImportError( "Vortex export requires the optional 'vortex-data' dependency." ) from exc # 1) Normalize to a plain Python dict (works better with pyarrow builders, # especially when the struct has a `null`-typed field like "masks"). if isinstance(data, pa.StructScalar): record_dict = data.as_py() else: # Validate by round-tripping through a typed scalar, then back to dict. record_dict = {f.name: data.get(f.name) for f in OME_ARROW_STRUCT} record_dict = pa.scalar(record_dict, type=OME_ARROW_STRUCT).as_py() # 2) Build a single-row struct array from the dict, explicitly passing the schema struct_array = pa.array([record_dict], type=OME_ARROW_STRUCT) # len=1 # 3) Wrap into a one-column table table = pa.table({column_name: struct_array}) # 4) Attach optional file-level metadata meta: Dict[bytes, bytes] = dict(table.schema.metadata or {}) try: meta[b"ome.arrow.type"] = str(OME_ARROW_TAG_TYPE).encode("utf-8") meta[b"ome.arrow.version"] = str(OME_ARROW_TAG_VERSION).encode("utf-8") except Exception: pass if file_metadata: for k, v in file_metadata.items(): meta[str(k).encode("utf-8")] = str(v).encode("utf-8") table = table.replace_schema_metadata(meta) # 5) Write Vortex (single row, single column) vxio.write(table, str(out_path))