Source code for iceberg_bioimage.publishing.image_assets

"""Image asset publishing helpers."""

from __future__ import annotations

import json
import warnings
from collections.abc import Iterable
from pathlib import Path
from typing import Callable, Protocol, TypeVar

import pyarrow as pa
from pyiceberg.exceptions import (
    NamespaceAlreadyExistsError,
    NoSuchNamespaceError,
    NoSuchTableError,
)

from iceberg_bioimage.models.scan_result import ImageAsset, ScanResult
from iceberg_bioimage.validation.contracts import raise_for_invalid_scan_result

CYTOTABLE_NAMESPACE_SEGMENT = "cytotable"


[docs] class SupportsAppend(Protocol): """Protocol for appendable Iceberg-like tables."""
[docs] def append(self, table: pa.Table) -> None: """Append a pyarrow table."""
TTable = TypeVar("TTable")
[docs] class SupportsLoadTable(Protocol[TTable]): """Protocol for catalog objects that can load existing tables."""
[docs] def load_table(self, identifier: tuple[str, ...]) -> TTable: """Load an existing table."""
[docs] class SupportsCatalog(Protocol): """Protocol for catalog objects used by the publishing layer."""
[docs] def load_table(self, identifier: tuple[str, ...]) -> SupportsAppend: """Load an existing table."""
[docs] def create_table( self, identifier: tuple[str, ...], schema: object, ) -> SupportsAppend: """Create and return a table."""
[docs] def publish_image_assets( catalog: str | SupportsCatalog, namespace: str | Iterable[str], table_name: str, scan_result: ScanResult, ) -> int: """Publish a scan result into the canonical `image_assets` Iceberg table.""" raise_for_invalid_scan_result(scan_result) rows = scan_result_to_rows(scan_result) table = _load_or_create_table(catalog, namespace, table_name) table.append(pa.Table.from_pylist(rows)) return len(rows)
[docs] def scan_result_to_rows(scan_result: ScanResult) -> list[dict[str, object]]: """Convert a scan result into canonical image_assets rows.""" dataset_id = _dataset_id(scan_result.source_uri) return [ _asset_to_row( dataset_id=dataset_id, format_family=scan_result.format_family, asset=asset, ) for asset in scan_result.image_assets ]
def _asset_to_row( dataset_id: str, format_family: str, asset: ImageAsset, ) -> dict[str, object]: return { "dataset_id": dataset_id, "image_id": asset.image_id or _fallback_image_id(dataset_id, asset.array_path), "format_family": format_family, "uri": asset.uri, "array_path": asset.array_path, "shape_json": json.dumps(asset.shape), "dtype": asset.dtype, "chunk_shape_json": ( json.dumps(asset.chunk_shape) if asset.chunk_shape else None ), "metadata_json": json.dumps(asset.metadata, sort_keys=True) if asset.metadata else None, } def _load_or_create_table( catalog_or_name: str | SupportsCatalog, namespace: str | Iterable[str], table_name: str, *, schema_builder: Callable[[], object] | None = None, ) -> SupportsAppend: catalog = _resolve_catalog(catalog_or_name) requested_namespace = _normalize_namespace(namespace) candidate_namespaces = _namespace_candidates(requested_namespace) try: return _load_table_with_namespace_fallback( catalog, requested_namespace, table_name, operation="publishing", ) except NoSuchTableError: pass build_schema = ( _build_image_assets_schema if schema_builder is None else schema_builder ) preferred_namespace = candidate_namespaces[0] _warn_for_namespace_resolution( requested_namespace, preferred_namespace, table_name, operation="publishing", creating=True, ) _ensure_namespace_exists(catalog, preferred_namespace) identifier = (*preferred_namespace, table_name) return catalog.create_table(identifier, schema=build_schema()) def _resolve_catalog(catalog_or_name: str | SupportsCatalog) -> SupportsCatalog: if not isinstance(catalog_or_name, str): return catalog_or_name try: from pyiceberg.catalog import load_catalog except ImportError as exc: # pragma: no cover - guarded by dependency declaration raise RuntimeError( "PyIceberg is required to resolve a catalog by name. " "Install `pyiceberg` first." ) from exc return load_catalog(catalog_or_name) def _build_image_assets_schema() -> object: try: from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType except ImportError as exc: # pragma: no cover - guarded by dependency declaration raise RuntimeError( "PyIceberg is required for publishing. Install `pyiceberg` first." ) from exc # Field IDs are intentionally non-sequential in _build_image_assets_schema. # These stable NestedField identifiers keep compatibility with prior or # future schema revisions without reusing removed field IDs. return Schema( NestedField( field_id=1, name="dataset_id", field_type=StringType(), required=True, ), NestedField( field_id=2, name="image_id", field_type=StringType(), required=True, ), NestedField( field_id=9, name="format_family", field_type=StringType(), required=True, ), NestedField( field_id=11, name="uri", field_type=StringType(), required=True, ), NestedField( field_id=13, name="array_path", field_type=StringType(), required=False, ), NestedField( field_id=14, name="shape_json", field_type=StringType(), required=True, ), NestedField( field_id=15, name="chunk_shape_json", field_type=StringType(), required=False, ), NestedField( field_id=16, name="dtype", field_type=StringType(), required=True, ), NestedField( field_id=19, name="metadata_json", field_type=StringType(), required=False, ), ) def _normalize_namespace(namespace: str | Iterable[str]) -> tuple[str, ...]: if isinstance(namespace, str): return tuple(part for part in namespace.split(".") if part) return tuple(part for part in namespace if part) def _namespace_candidates( namespace: str | Iterable[str], ) -> tuple[tuple[str, ...], ...]: normalized_namespace = _normalize_namespace(namespace) if _is_cytotable_namespace(normalized_namespace): return (normalized_namespace,) return ((*normalized_namespace, CYTOTABLE_NAMESPACE_SEGMENT), normalized_namespace) def _load_table_with_namespace_fallback( catalog: SupportsLoadTable[TTable], namespace: str | Iterable[str], table_name: str, *, operation: str, ) -> TTable: requested_namespace = _normalize_namespace(namespace) for resolved_namespace in _namespace_candidates(requested_namespace): identifier = (*resolved_namespace, table_name) try: table = catalog.load_table(identifier) except NoSuchTableError: continue _warn_for_namespace_resolution( requested_namespace, resolved_namespace, table_name, operation=operation, ) return table identifier = (*_namespace_candidates(requested_namespace)[0], table_name) raise NoSuchTableError(f"Missing table: {identifier!r}") def _list_tables_with_namespace_fallback( catalog: SupportsCatalog, namespace: str | Iterable[str], ) -> list[tuple[str, ...]]: if not hasattr(catalog, "list_tables"): raise TypeError("Catalog must provide a list_tables(namespace) method.") requested_namespace = _normalize_namespace(namespace) discovered: dict[str, tuple[str, ...]] = {} warned_for_list = False for resolved_namespace in _namespace_candidates(requested_namespace): try: identifiers = catalog.list_tables(resolved_namespace) except NoSuchNamespaceError: continue if identifiers and not warned_for_list: _warn_for_namespace_resolution( requested_namespace, resolved_namespace, "catalog tables", operation="listing", ) warned_for_list = True for identifier in identifiers: discovered.setdefault(identifier[-1], identifier) return [discovered[table_name] for table_name in sorted(discovered)] def _ensure_namespace_exists( catalog: SupportsCatalog, namespace: tuple[str, ...], ) -> None: if hasattr(catalog, "create_namespace_if_not_exists"): catalog.create_namespace_if_not_exists(namespace) return if hasattr(catalog, "create_namespace"): try: catalog.create_namespace(namespace) except NamespaceAlreadyExistsError: # pragma: no cover - backend-specific return def _is_cytotable_namespace(namespace: tuple[str, ...]) -> bool: return bool(namespace) and namespace[-1] == CYTOTABLE_NAMESPACE_SEGMENT def _warn_for_namespace_resolution( requested_namespace: tuple[str, ...], resolved_namespace: tuple[str, ...], table_name: str, *, operation: str, creating: bool = False, ) -> None: if requested_namespace == resolved_namespace and _is_cytotable_namespace( requested_namespace ): return expected_namespace_parts = _namespace_candidates(requested_namespace)[0] expected_namespace = ".".join(expected_namespace_parts) resolved_namespace_name = ".".join(resolved_namespace) expected_full = ( f"{expected_namespace}.{table_name}" if expected_namespace else table_name ) resolved_full = ( f"{resolved_namespace_name}.{table_name}" if resolved_namespace_name else table_name ) action = "Creating" if creating else "Using" warnings.warn( ( f"Namespace '{'.'.join(requested_namespace)}' does not match " f"CytoTable's expected Iceberg namespace layout. {action} " f"'{resolved_full}' during {operation}; " f"CytoTable expects '{expected_full}'." ), UserWarning, stacklevel=3, ) def _dataset_id(uri: str) -> str: name = Path(uri.rstrip("/")).name for suffix in (".ome.zarr", ".zarr", ".ome.tiff", ".ome.tif", ".tiff", ".tif"): if name.endswith(suffix): return name.removesuffix(suffix) return name def _fallback_image_id(dataset_id: str, array_path: str | None) -> str: return dataset_id if array_path is None else f"{dataset_id}:{array_path}"