Source code for iceberg_bioimage.api

"""Public API entry points."""

from __future__ import annotations

from collections.abc import Sequence
from pathlib import Path

import pyarrow as pa

from iceberg_bioimage.adapters.base import BaseAdapter
from iceberg_bioimage.adapters.ome_tiff import OMETiffAdapter
from iceberg_bioimage.adapters.zarr_v2 import ZarrV2Adapter
from iceberg_bioimage.integrations.duckdb import (
    MetadataSource,
    join_image_assets_with_profiles,
)
from iceberg_bioimage.models.scan_result import (
    ContractValidationResult,
    DatasetSummary,
    RegistrationResult,
    ScanResult,
    WarehouseIngestResult,
)
from iceberg_bioimage.publishing.chunk_index import (
    publish_chunk_index,
    scan_result_to_chunk_rows,
)
from iceberg_bioimage.publishing.image_assets import (
    SupportsCatalog,
    publish_image_assets,
    scan_result_to_rows,
)
from iceberg_bioimage.validation.contracts import (
    validate_microscopy_profile_columns,
    validate_microscopy_profile_table,
)


def _build_adapters() -> Sequence[BaseAdapter]:
    return (ZarrV2Adapter(), OMETiffAdapter())


[docs] def scan_store(uri: str) -> ScanResult: """Scan a supported image store and return canonical metadata.""" for adapter in _build_adapters(): if adapter.can_handle(uri): return adapter.scan(uri) raise ValueError( "Unsupported bioimage URI " f"{uri!r}. Supported formats are .zarr, .tif, and .tiff." )
[docs] def register_store( uri: str, catalog: str | SupportsCatalog, namespace: str | Sequence[str], *, image_assets_table: str = "image_assets", chunk_index_table: str | None = "chunk_index", ) -> RegistrationResult: """Scan a store and publish canonical metadata tables.""" scan_result = scan_store(uri) image_assets_rows = publish_image_assets( catalog, namespace, image_assets_table, scan_result, ) chunk_rows = 0 if chunk_index_table is not None: chunk_rows = publish_chunk_index( catalog, namespace, chunk_index_table, scan_result, ) return RegistrationResult( source_uri=scan_result.source_uri, image_assets_rows_published=image_assets_rows, chunk_rows_published=chunk_rows, )
[docs] def ingest_scan_results_to_warehouse( scan_results: Sequence[ScanResult], catalog: str | SupportsCatalog, namespace: str | Sequence[str], *, image_assets_table: str = "image_assets", chunk_index_table: str | None = "chunk_index", ) -> WarehouseIngestResult: """Publish many scanned datasets into a Cytotable-compatible warehouse.""" normalized_namespace = _normalize_namespace_parts(namespace) datasets: list[RegistrationResult] = [] warnings: list[str] = [] for scan_result in scan_results: image_assets_rows = publish_image_assets( catalog, normalized_namespace, image_assets_table, scan_result, ) chunk_rows = 0 if chunk_index_table is not None: chunk_rows = publish_chunk_index( catalog, normalized_namespace, chunk_index_table, scan_result, ) datasets.append( RegistrationResult( source_uri=scan_result.source_uri, image_assets_rows_published=image_assets_rows, chunk_rows_published=chunk_rows, ) ) warnings.extend(scan_result.warnings) return WarehouseIngestResult( catalog=catalog if isinstance(catalog, str) else type(catalog).__name__, namespace=list(normalized_namespace), image_assets_table=image_assets_table, chunk_index_table=chunk_index_table, datasets=datasets, warnings=warnings, )
[docs] def ingest_stores_to_warehouse( uris: Sequence[str], catalog: str | SupportsCatalog, namespace: str | Sequence[str], *, image_assets_table: str = "image_assets", chunk_index_table: str | None = "chunk_index", ) -> WarehouseIngestResult: """Scan and publish many datasets into a Cytotable-compatible warehouse.""" scan_results = [scan_store(uri) for uri in uris] return ingest_scan_results_to_warehouse( scan_results, catalog, namespace, image_assets_table=image_assets_table, chunk_index_table=chunk_index_table, )
[docs] def summarize_scan_result(scan_result: ScanResult) -> DatasetSummary: """Build a concise user-facing summary from a scan result.""" axes = sorted( { str(asset.metadata["axes"]) for asset in scan_result.image_assets if asset.metadata.get("axes") } ) channel_counts = sorted( { int(asset.metadata["channel_count"]) for asset in scan_result.image_assets if asset.metadata.get("channel_count") is not None } ) storage_variants = sorted( { str(asset.metadata["storage_variant"]) for asset in scan_result.image_assets if asset.metadata.get("storage_variant") } ) shapes = sorted({tuple(asset.shape) for asset in scan_result.image_assets}) return DatasetSummary( source_uri=scan_result.source_uri, format_family=scan_result.format_family, image_asset_count=len(scan_result.image_assets), chunked_asset_count=sum( 1 for asset in scan_result.image_assets if asset.chunk_shape is not None ), array_paths=[ asset.array_path if asset.array_path is not None else "<root>" for asset in scan_result.image_assets ], dtypes=sorted({asset.dtype for asset in scan_result.image_assets}), shapes=[list(shape) for shape in shapes], axes=axes, channel_counts=channel_counts, storage_variants=storage_variants, warnings=list(scan_result.warnings), )
[docs] def summarize_store(uri: str) -> DatasetSummary: """Scan a store and return a concise dataset summary.""" return summarize_scan_result(scan_store(uri))
[docs] def join_profiles_with_scan_result( scan_result: ScanResult, profiles: MetadataSource, *, include_chunks: bool = False, profile_dataset_id: str | None = None, ) -> pa.Table: """Join canonical image assets from a scan result to profile rows. This helper uses the optional DuckDB integration at runtime. Install the `duckdb` extra/group before calling it. """ validation = _validate_profiles(profiles) if ( profile_dataset_id is not None and "dataset_id" in validation.missing_required_columns ): validation.missing_required_columns = [ column for column in validation.missing_required_columns if column != "dataset_id" ] if not validation.is_valid: raise ValueError( "Profiles do not satisfy the microscopy join contract: " + ", ".join(validation.missing_required_columns) ) image_assets = pa.Table.from_pylist(scan_result_to_rows(scan_result)) chunk_index = None if include_chunks: chunk_index = pa.Table.from_pylist(scan_result_to_chunk_rows(scan_result)) return join_image_assets_with_profiles( image_assets, profiles, chunk_index=chunk_index, profile_dataset_id=profile_dataset_id, )
[docs] def join_profiles_with_store( uri: str, profiles: MetadataSource, *, include_chunks: bool = False, profile_dataset_id: str | None = None, ) -> pa.Table: """Scan a store and join its canonical image assets to profile rows. This helper uses the optional DuckDB integration at runtime. Install the `duckdb` extra/group before calling it. """ return join_profiles_with_scan_result( scan_store(uri), profiles, include_chunks=include_chunks, profile_dataset_id=profile_dataset_id, )
def _validate_profiles(profiles: MetadataSource) -> ContractValidationResult: if isinstance(profiles, (str, Path)): return validate_microscopy_profile_table(str(profiles)) if isinstance(profiles, pa.Table): return validate_microscopy_profile_columns(list(profiles.schema.names)) if isinstance(profiles, list): columns = sorted({key for row in profiles for key in row}) return validate_microscopy_profile_columns(columns) raise TypeError(f"Unsupported profile source type: {type(profiles)!r}") def _normalize_namespace_parts(namespace: str | Sequence[str]) -> tuple[str, ...]: if isinstance(namespace, str): return tuple(part for part in namespace.split(".") if part) return tuple(part for part in namespace if part)