Source code for iceberg_bioimage.validation.contracts

"""Validation helpers for canonical scan objects and join contracts."""

from __future__ import annotations

import tomllib
from collections.abc import Mapping
from json import JSONDecodeError
from pathlib import Path

import pyarrow.dataset as ds

from iceberg_bioimage.models.scan_result import (
    ContractValidationResult,
    ScanResult,
    WarehouseValidationResult,
)

MICROSCOPY_REQUIRED_JOIN_KEYS = ("dataset_id", "image_id")
MICROSCOPY_RECOMMENDED_JOIN_KEYS = ("plate_id", "well_id", "site_id")
MICROSCOPY_PROFILE_COLUMN_ALIASES: dict[str, tuple[str, ...]] = {
    "dataset_id": (
        "Metadata_dataset_id",
        "Metadata_DatasetID",
        "Metadata_DatasetId",
        "Metadata_dataset",
        "Metadata_Source",
        "Metadata_source",
    ),
    "image_id": (
        "Metadata_image_id",
        "Metadata_ImageID",
        "Metadata_ImageId",
        "Metadata_image",
    ),
    "plate_id": (
        "Metadata_Plate",
        "Image_Metadata_Plate",
        "Metadata_plate",
    ),
    "well_id": (
        "Metadata_Well",
        "Image_Metadata_Well",
        "Metadata_well",
    ),
    "site_id": (
        "Metadata_Site",
        "Image_Metadata_Site",
        "Metadata_SiteNumber",
        "Metadata_Field",
        "Metadata_FOV",
        "Metadata_site",
    ),
}


[docs] def validate_scan_result(scan_result: ScanResult) -> list[str]: """Return validation errors for a scan result.""" errors: list[str] = [] if not scan_result.source_uri: errors.append("ScanResult.source_uri is required.") if not scan_result.image_assets: errors.append("ScanResult.image_assets must contain at least one asset.") for index, asset in enumerate(scan_result.image_assets): prefix = f"image_assets[{index}]" if not asset.uri: errors.append(f"{prefix}.uri is required.") if not asset.shape: errors.append(f"{prefix}.shape is required.") if not asset.dtype: errors.append(f"{prefix}.dtype is required.") return errors
[docs] def raise_for_invalid_scan_result(scan_result: ScanResult) -> None: """Raise a ValueError when a scan result is invalid.""" errors = validate_scan_result(scan_result) if errors: raise ValueError("Invalid ScanResult: " + "; ".join(errors))
[docs] def validate_microscopy_profile_columns( columns: list[str] | tuple[str, ...], *, target: str = "profile_table", alias_map: Mapping[str, tuple[str, ...] | list[str]] | None = None, ) -> ContractValidationResult: """Validate a schema against the microscopy join contract.""" present_columns = list(columns) resolved_columns = resolve_microscopy_profile_columns( present_columns, alias_map=alias_map, ) missing_required = [ column for column in MICROSCOPY_REQUIRED_JOIN_KEYS if resolved_columns[column] is None ] missing_recommended = [ column for column in MICROSCOPY_RECOMMENDED_JOIN_KEYS if resolved_columns[column] is None ] warnings = _profile_contract_warnings(resolved_columns) if missing_recommended: warnings.append( "Recommended microscopy join keys are missing: " + ", ".join(missing_recommended) ) return ContractValidationResult( target=target, present_columns=present_columns, required_columns=list(MICROSCOPY_REQUIRED_JOIN_KEYS), recommended_columns=list(MICROSCOPY_RECOMMENDED_JOIN_KEYS), missing_required_columns=missing_required, missing_recommended_columns=missing_recommended, warnings=warnings, )
[docs] def validate_microscopy_profile_table(path: str) -> ContractValidationResult: """Validate a local profile table file against the microscopy join contract.""" try: dataset = ds.dataset(path) except Exception as exc: error = f"Invalid dataset path: {path}: {exc}" return ContractValidationResult( target=str(Path(path)), present_columns=[], required_columns=list(MICROSCOPY_REQUIRED_JOIN_KEYS), recommended_columns=list(MICROSCOPY_RECOMMENDED_JOIN_KEYS), missing_required_columns=list(MICROSCOPY_REQUIRED_JOIN_KEYS), missing_recommended_columns=list(MICROSCOPY_RECOMMENDED_JOIN_KEYS), warnings=[error], ) return validate_microscopy_profile_columns( list(dataset.schema.names), target=str(Path(path)), )
[docs] def resolve_microscopy_profile_columns( columns: list[str] | tuple[str, ...], *, alias_map: Mapping[str, tuple[str, ...] | list[str]] | None = None, ) -> dict[str, str | None]: """Resolve canonical microscopy columns from a schema with known aliases.""" present_columns = set(columns) resolved: dict[str, str | None] = {} aliases = { canonical: list(known_aliases) for canonical, known_aliases in MICROSCOPY_PROFILE_COLUMN_ALIASES.items() } if alias_map is not None: for canonical, custom_aliases in alias_map.items(): aliases.setdefault(canonical, []).extend(custom_aliases) for column in ( *MICROSCOPY_REQUIRED_JOIN_KEYS, *MICROSCOPY_RECOMMENDED_JOIN_KEYS, ): if column in present_columns: resolved[column] = column continue resolved[column] = next( (alias for alias in aliases.get(column, ()) if alias in present_columns), None, ) return resolved
[docs] def profile_column_aliases() -> Mapping[str, tuple[str, ...]]: """Return the supported microscopy profile column aliases.""" return MICROSCOPY_PROFILE_COLUMN_ALIASES
[docs] def load_profile_column_aliases(path: str | Path) -> dict[str, tuple[str, ...]]: """Load microscopy profile column aliases from a TOML file.""" payload = tomllib.loads(Path(path).read_text()) alias_section = payload.get("microscopy", {}).get("aliases", {}) return { canonical: tuple(str(alias) for alias in aliases) for canonical, aliases in alias_section.items() }
[docs] def validate_warehouse_manifest(path: str | Path) -> WarehouseValidationResult: """Validate a manifest-backed warehouse root.""" from iceberg_bioimage.integrations.cytomining import load_warehouse_manifest root = Path(path) result = WarehouseValidationResult(warehouse_root=str(root)) manifest_path = root / "warehouse_manifest.json" if not manifest_path.exists(): result.errors.append("warehouse_manifest.json is missing.") return result try: manifest = load_warehouse_manifest(root) except (JSONDecodeError, KeyError, TypeError, ValueError) as exc: result.errors.append(f"Invalid warehouse_manifest.json: {exc}") return result seen_table_names: set[str] = set() for table in manifest.tables: if table.table_name in seen_table_names: result.errors.append( f"Duplicate table_name in manifest: {table.table_name}" ) continue seen_table_names.add(table.table_name) dataset_path = root / table.table_name if not dataset_path.exists(): result.errors.append( f"Manifest table path does not exist: {table.table_name}" ) continue if table.role in {"image_assets", "joined_profiles"} and not table.join_keys: result.errors.append( f"Manifest table {table.table_name} must declare join_keys." ) if not table.columns: result.warnings.append( f"Manifest table {table.table_name} does not record columns." ) return result
def _profile_contract_warnings(resolved_columns: dict[str, str | None]) -> list[str]: warnings: list[str] = [] aliased_columns = [ f"{canonical}<-{resolved}" for canonical, resolved in resolved_columns.items() if resolved is not None and canonical != resolved ] if aliased_columns: warnings.append( "Microscopy join keys will require alias normalization: " + ", ".join(aliased_columns) ) return warnings