{ "cells": [ { "cell_type": "markdown", "id": "1a67ed2f", "metadata": {}, "source": [ "# Warehouse Namespace Demo\n", "\n", "This notebook shows a minimal end-to-end warehouse workflow for the\n", "`cytomining` ecosystem:\n", "\n", "- build a tiny Zarr store and a tiny OME-TIFF store\n", "- ingest both through the warehouse API\n", "- show the preferred Cytotable-compatible namespace layout\n", "- show how legacy namespaces are still supported\n", "- show how `pycytominer` and `coSMicQC`-style `Metadata_*` columns join to\n", " the warehouse without manual renaming\n" ] }, { "cell_type": "code", "execution_count": null, "id": "df6c3d1a", "metadata": {}, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "import tempfile\n", "import warnings\n", "from pathlib import Path\n", "\n", "import numpy as np\n", "import pyarrow as pa\n", "import tifffile\n", "import zarr\n", "from pyiceberg.exceptions import NoSuchTableError\n", "\n", "from iceberg_bioimage import (\n", " ingest_stores_to_warehouse,\n", " join_profiles_with_store,\n", " scan_store,\n", " summarize_store,\n", ")\n", "\n", "\n", "class DemoTable:\n", " def __init__(self) -> None:\n", " self.appends: list[pa.Table] = []\n", "\n", " def append(self, table: pa.Table) -> None:\n", " self.appends.append(table)\n", "\n", "\n", "class DemoCatalog:\n", " def __init__(self, tables: dict[tuple[str, ...], DemoTable] | None = None) -> None:\n", " self.tables = {} if tables is None else dict(tables)\n", " self.created_namespaces: list[tuple[str, ...]] = []\n", " self.created_identifiers: list[tuple[str, ...]] = []\n", "\n", " def load_table(self, identifier: tuple[str, ...]) -> DemoTable:\n", " if identifier not in self.tables:\n", " raise NoSuchTableError(f\"Missing table: {identifier!r}\")\n", " return self.tables[identifier]\n", "\n", " def create_table(self, identifier: tuple[str, ...], schema: object) -> DemoTable:\n", " if identifier in self.tables:\n", " raise ValueError(f\"Table already exists: {identifier!r}\")\n", " self.created_identifiers.append(identifier)\n", " table = DemoTable()\n", " self.tables[identifier] = table\n", " return table\n", "\n", " def create_namespace_if_not_exists(self, namespace: tuple[str, ...]) -> None:\n", " self.created_namespaces.append(namespace)\n", "\n", " def list_tables(self, namespace: tuple[str, ...]) -> list[tuple[str, ...]]:\n", " return [\n", " identifier for identifier in self.tables if identifier[:-1] == namespace\n", " ]\n", "\n", "\n", "def warehouse_snapshot(catalog: DemoCatalog) -> dict[str, list[dict[str, object]]]:\n", " snapshot: dict[str, list[dict[str, object]]] = {}\n", " for identifier, table in sorted(catalog.tables.items()):\n", " rows: list[dict[str, object]] = []\n", " for appended in table.appends:\n", " rows.extend(appended.to_pylist())\n", " snapshot[\".\".join(identifier)] = rows\n", " return snapshot" ] }, { "cell_type": "code", "execution_count": null, "id": "7d623cd0", "metadata": {}, "outputs": [], "source": [ "with tempfile.TemporaryDirectory(prefix=\"iceberg-bioimage-demo-\") as tmpdir_ctx:\n", " tmpdir = Path(tmpdir_ctx)\n", "\n", " zarr_path = tmpdir / \"plate.zarr\"\n", " root = zarr.open_group(zarr_path, mode=\"w\", zarr_version=2)\n", " root.attrs[\"multiscales\"] = [{\"axes\": [\"c\", \"y\", \"x\"], \"datasets\": [{\"path\": \"0\"}]}]\n", " root.create_dataset(\n", " \"0\",\n", " shape=(1, 4, 4),\n", " data=np.arange(16, dtype=np.uint16).reshape(1, 4, 4),\n", " chunks=(1, 2, 2),\n", " )\n", "\n", " tiff_path = tmpdir / \"cells.ome.tiff\"\n", " tifffile.imwrite(tiff_path, np.arange(24, dtype=np.uint8).reshape(2, 3, 4))\n", "\n", " zarr_summary = summarize_store(str(zarr_path)).to_dict()\n", " tiff_summary = summarize_store(str(tiff_path)).to_dict()\n", "\n", " {\n", " \"zarr_summary\": zarr_summary,\n", " \"tiff_summary\": tiff_summary,\n", " }\n", " # -\n", "\n", " # ## Preferred Cytotable namespace\n", " #\n", " # When you ingest into namespace `bioimage`, this project prefers the\n", " # Cytotable-compatible layout `bioimage.cytotable.*` for new warehouse tables.\n", " # The two canonical tables are:\n", " #\n", " # - `image_assets`: one row per discovered image asset\n", " # - `chunk_index`: one row per chunk when chunk metadata is available\n", " #\n", "\n", " # +\n", " catalog = DemoCatalog()\n", " warehouse = ingest_stores_to_warehouse(\n", " [str(zarr_path), str(tiff_path)],\n", " catalog,\n", " \"bioimage\",\n", " )\n", "\n", " {\n", " \"warehouse_result\": warehouse.to_dict(),\n", " \"created_namespaces\": catalog.created_namespaces,\n", " \"created_identifiers\": catalog.created_identifiers,\n", " \"warehouse_snapshot\": warehouse_snapshot(catalog),\n", " }\n", " # -\n", "\n", " # ## Legacy namespace fallback\n", " #\n", " # Existing warehouses may already store tables directly under `bioimage.*`.\n", " # When those legacy tables already exist, the ingest path reuses them instead of\n", " # creating a second copy under `bioimage.cytotable.*`, and it emits a warning so\n", " # the layout difference is visible.\n", " #\n", "\n", " # +\n", " legacy_catalog = DemoCatalog(\n", " tables={\n", " (\"bioimage\", \"image_assets\"): DemoTable(),\n", " (\"bioimage\", \"chunk_index\"): DemoTable(),\n", " }\n", " )\n", "\n", " with warnings.catch_warnings(record=True) as caught:\n", " warnings.simplefilter(\"always\")\n", " legacy_result = ingest_stores_to_warehouse(\n", " [str(zarr_path)],\n", " legacy_catalog,\n", " \"bioimage\",\n", " )\n", "\n", " {\n", " \"legacy_result\": legacy_result.to_dict(),\n", " \"legacy_identifiers\": sorted(\n", " \".\".join(identifier) for identifier in legacy_catalog.tables\n", " ),\n", " \"warnings\": [str(item.message) for item in caught],\n", " }\n", " # -\n", "\n", " # ## Cytomining profile tables\n", " #\n", " # `pycytominer` and `coSMicQC` profile tables often use `Metadata_*` columns.\n", " # The join and export paths normalize common aliases like:\n", " #\n", " # - `Metadata_dataset_id -> dataset_id`\n", " # - `Metadata_ImageID -> image_id`\n", " # - `Metadata_Plate -> plate_id`\n", " # - `Metadata_Well -> well_id`\n", " # - `Metadata_Site -> site_id`\n", " #\n", " # That means profile tables from the Cytomining ecosystem can join to the\n", " # warehouse without manual renaming when those aliases are present.\n", " #\n", "\n", " # +\n", " zarr_scan = scan_store(str(zarr_path))\n", " zarr_asset = zarr_scan.image_assets[0]\n", "\n", " profiles = pa.table(\n", " {\n", " \"Metadata_dataset_id\": [\"plate\"],\n", " \"Metadata_ImageID\": [zarr_asset.image_id],\n", " \"Metadata_Plate\": [\"Plate-1\"],\n", " \"Metadata_Well\": [\"A01\"],\n", " \"Metadata_Site\": [\"1\"],\n", " \"cell_count\": [42],\n", " }\n", " )\n", "\n", " joined = join_profiles_with_store(str(zarr_path), profiles)\n", " joined.select(\n", " [\"dataset_id\", \"image_id\", \"plate_id\", \"well_id\", \"site_id\", \"cell_count\"]\n", " ).to_pydict()" ] } ], "metadata": { "jupytext": { "formats": "ipynb,py:light" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.11" } }, "nbformat": 4, "nbformat_minor": 5 }