Warehouse Namespace Demo#
This notebook shows a minimal end-to-end warehouse workflow for the
cytomining ecosystem:
build a tiny Zarr store and a tiny OME-TIFF store
ingest both through the warehouse API
show the preferred Cytotable-compatible namespace layout
show how legacy namespaces are still supported
show how
pycytominerandcoSMicQC-styleMetadata_*columns join to the warehouse without manual renaming
from __future__ import annotations
import tempfile
import warnings
from pathlib import Path
import numpy as np
import pyarrow as pa
import tifffile
import zarr
from pyiceberg.exceptions import NoSuchTableError
from iceberg_bioimage import (
ingest_stores_to_warehouse,
join_profiles_with_store,
scan_store,
summarize_store,
)
class DemoTable:
def __init__(self) -> None:
self.appends: list[pa.Table] = []
def append(self, table: pa.Table) -> None:
self.appends.append(table)
class DemoCatalog:
def __init__(self, tables: dict[tuple[str, ...], DemoTable] | None = None) -> None:
self.tables = {} if tables is None else dict(tables)
self.created_namespaces: list[tuple[str, ...]] = []
self.created_identifiers: list[tuple[str, ...]] = []
def load_table(self, identifier: tuple[str, ...]) -> DemoTable:
if identifier not in self.tables:
raise NoSuchTableError(f"Missing table: {identifier!r}")
return self.tables[identifier]
def create_table(self, identifier: tuple[str, ...], schema: object) -> DemoTable:
if identifier in self.tables:
raise ValueError(f"Table already exists: {identifier!r}")
self.created_identifiers.append(identifier)
table = DemoTable()
self.tables[identifier] = table
return table
def create_namespace_if_not_exists(self, namespace: tuple[str, ...]) -> None:
self.created_namespaces.append(namespace)
def list_tables(self, namespace: tuple[str, ...]) -> list[tuple[str, ...]]:
return [
identifier for identifier in self.tables if identifier[:-1] == namespace
]
def warehouse_snapshot(catalog: DemoCatalog) -> dict[str, list[dict[str, object]]]:
snapshot: dict[str, list[dict[str, object]]] = {}
for identifier, table in sorted(catalog.tables.items()):
rows: list[dict[str, object]] = []
for appended in table.appends:
rows.extend(appended.to_pylist())
snapshot[".".join(identifier)] = rows
return snapshot
with tempfile.TemporaryDirectory(prefix="iceberg-bioimage-demo-") as tmpdir_ctx:
tmpdir = Path(tmpdir_ctx)
zarr_path = tmpdir / "plate.zarr"
root = zarr.open_group(zarr_path, mode="w", zarr_version=2)
root.attrs["multiscales"] = [{"axes": ["c", "y", "x"], "datasets": [{"path": "0"}]}]
root.create_dataset(
"0",
shape=(1, 4, 4),
data=np.arange(16, dtype=np.uint16).reshape(1, 4, 4),
chunks=(1, 2, 2),
)
tiff_path = tmpdir / "cells.ome.tiff"
tifffile.imwrite(tiff_path, np.arange(24, dtype=np.uint8).reshape(2, 3, 4))
zarr_summary = summarize_store(str(zarr_path)).to_dict()
tiff_summary = summarize_store(str(tiff_path)).to_dict()
{
"zarr_summary": zarr_summary,
"tiff_summary": tiff_summary,
}
# -
# ## Preferred Cytotable namespace
#
# When you ingest into namespace `bioimage`, this project prefers the
# Cytotable-compatible layout `bioimage.cytotable.*` for new warehouse tables.
# The two canonical tables are:
#
# - `image_assets`: one row per discovered image asset
# - `chunk_index`: one row per chunk when chunk metadata is available
#
# +
catalog = DemoCatalog()
warehouse = ingest_stores_to_warehouse(
[str(zarr_path), str(tiff_path)],
catalog,
"bioimage",
)
{
"warehouse_result": warehouse.to_dict(),
"created_namespaces": catalog.created_namespaces,
"created_identifiers": catalog.created_identifiers,
"warehouse_snapshot": warehouse_snapshot(catalog),
}
# -
# ## Legacy namespace fallback
#
# Existing warehouses may already store tables directly under `bioimage.*`.
# When those legacy tables already exist, the ingest path reuses them instead of
# creating a second copy under `bioimage.cytotable.*`, and it emits a warning so
# the layout difference is visible.
#
# +
legacy_catalog = DemoCatalog(
tables={
("bioimage", "image_assets"): DemoTable(),
("bioimage", "chunk_index"): DemoTable(),
}
)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
legacy_result = ingest_stores_to_warehouse(
[str(zarr_path)],
legacy_catalog,
"bioimage",
)
{
"legacy_result": legacy_result.to_dict(),
"legacy_identifiers": sorted(
".".join(identifier) for identifier in legacy_catalog.tables
),
"warnings": [str(item.message) for item in caught],
}
# -
# ## Cytomining profile tables
#
# `pycytominer` and `coSMicQC` profile tables often use `Metadata_*` columns.
# The join and export paths normalize common aliases like:
#
# - `Metadata_dataset_id -> dataset_id`
# - `Metadata_ImageID -> image_id`
# - `Metadata_Plate -> plate_id`
# - `Metadata_Well -> well_id`
# - `Metadata_Site -> site_id`
#
# That means profile tables from the Cytomining ecosystem can join to the
# warehouse without manual renaming when those aliases are present.
#
# +
zarr_scan = scan_store(str(zarr_path))
zarr_asset = zarr_scan.image_assets[0]
profiles = pa.table(
{
"Metadata_dataset_id": ["plate"],
"Metadata_ImageID": [zarr_asset.image_id],
"Metadata_Plate": ["Plate-1"],
"Metadata_Well": ["A01"],
"Metadata_Site": ["1"],
"cell_count": [42],
}
)
joined = join_profiles_with_store(str(zarr_path), profiles)
joined.select(
["dataset_id", "image_id", "plate_id", "well_id", "site_id", "cell_count"]
).to_pydict()