Source code for iceberg_bioimage.integrations.catalog

"""Catalog-facing helpers for reading canonical Iceberg metadata tables."""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from typing import Protocol

import pyarrow as pa

from iceberg_bioimage.integrations.duckdb import (
    DEFAULT_JOIN_KEYS,
    MetadataSource,
    join_image_assets_with_profiles,
)
from iceberg_bioimage.publishing.image_assets import (
    _list_tables_with_namespace_fallback,
    _load_table_with_namespace_fallback,
    _resolve_catalog,
)


[docs] class SupportsIcebergScan(Protocol): """Protocol for pyiceberg scan objects."""
[docs] def to_arrow(self) -> pa.Table: """Materialize the scan as an Arrow table."""
[docs] class SupportsIcebergTable(Protocol): """Protocol for pyiceberg table objects."""
[docs] def scan( self, row_filter: str = "True", selected_fields: tuple[str, ...] = ("*",), case_sensitive: bool = True, snapshot_id: int | None = None, limit: int | None = None, ) -> SupportsIcebergScan: """Return a scan object for the current table."""
[docs] class SupportsScanCatalog(Protocol): """Protocol for catalogs used by the read-only integration helpers."""
[docs] def load_table(self, identifier: tuple[str, ...]) -> SupportsIcebergTable: """Load an existing Iceberg table."""
[docs] def list_tables(self, namespace: tuple[str, ...]) -> list[tuple[str, ...]]: """List tables within a namespace."""
[docs] @dataclass(frozen=True, slots=True) class CatalogScanOptions: """Options for scanning a catalog-backed metadata table.""" columns: Sequence[str] | None = None where: str | None = None snapshot_id: int | None = None limit: int | None = None
[docs] def load_catalog_table( catalog: str | SupportsScanCatalog, namespace: str | Sequence[str], table_name: str, ) -> SupportsIcebergTable: """Load a canonical metadata table from a catalog.""" resolved_catalog = _resolve_scan_catalog(catalog) return _load_table_with_namespace_fallback( resolved_catalog, namespace, table_name, operation="loading", )
[docs] def list_catalog_tables( catalog: str | SupportsScanCatalog, namespace: str | Sequence[str], ) -> list[str]: """List canonical metadata tables available in a catalog namespace.""" resolved_catalog = _resolve_scan_catalog(catalog) table_names = { identifier[-1] for identifier in _list_tables_with_namespace_fallback( resolved_catalog, namespace, ) } return sorted(table_names)
[docs] def catalog_table_to_arrow( catalog: str | SupportsScanCatalog, namespace: str | Sequence[str], table_name: str, *, scan_options: CatalogScanOptions | None = None, ) -> pa.Table: """Load a catalog table into Arrow via PyIceberg.""" options = CatalogScanOptions() if scan_options is None else scan_options columns = _normalize_columns(options.columns) table = load_catalog_table(catalog, namespace, table_name) scan = table.scan( row_filter="True" if options.where is None else options.where, selected_fields=(("*",) if columns is None else tuple(columns)), snapshot_id=options.snapshot_id, limit=options.limit, ) return scan.to_arrow()
[docs] def join_catalog_image_assets_with_profiles( catalog: str | SupportsScanCatalog, namespace: str | Sequence[str], profiles: MetadataSource, *, image_assets_table: str = "image_assets", chunk_index_table: str | None = None, join_keys: Sequence[str] = DEFAULT_JOIN_KEYS, image_assets_scan_options: CatalogScanOptions | None = None, chunk_index_scan_options: CatalogScanOptions | None = None, profile_dataset_id: str | None = None, ) -> pa.Table: """Join catalog-backed image metadata to a profile table. Args: catalog: Catalog name or catalog-like object. namespace: Namespace containing the metadata tables. profiles: Profile rows or table to join against. image_assets_table: Name of the canonical image-assets table. chunk_index_table: Optional chunk-index table name. join_keys: Join columns shared by image metadata and profiles. image_assets_scan_options: Optional scan options for image-assets reads. chunk_index_scan_options: Optional scan options for chunk-index reads. profile_dataset_id: Dataset identifier to inject for profile inputs that do not carry their own `dataset_id` column. Defaults to None. """ normalized_join_keys = _normalize_columns(join_keys) if not normalized_join_keys: raise ValueError("join_keys must be a non-empty sequence of column names.") join_keys = normalized_join_keys image_assets = catalog_table_to_arrow( catalog, namespace, image_assets_table, scan_options=image_assets_scan_options, ) chunk_index = None if chunk_index_table is not None: chunk_index = catalog_table_to_arrow( catalog, namespace, chunk_index_table, scan_options=chunk_index_scan_options, ) return join_image_assets_with_profiles( image_assets, profiles, join_keys=join_keys, chunk_index=chunk_index, profile_dataset_id=profile_dataset_id, )
def _normalize_columns(columns: Sequence[str] | None) -> Sequence[str] | None: if columns is None: return None if isinstance(columns, str): return [columns] return columns def _resolve_scan_catalog(catalog: str | SupportsScanCatalog) -> SupportsScanCatalog: resolved_catalog = _resolve_catalog(catalog) if not hasattr(resolved_catalog, "list_tables"): raise TypeError("Catalog must provide a list_tables(namespace) method.") return resolved_catalog