Source code for OME_IRIS.verify

from __future__ import annotations

import argparse
from dataclasses import dataclass
import hashlib
from pathlib import Path

import yaml


[docs] @dataclass class VerifyResult: ok: bool issues: list[str]
REQUIRED_FIELDS = { "id", "name", "description", "tier", "license", "source_identifier", "source", "formats", "files", } def _sha256(path: Path) -> str: return hashlib.sha256(path.read_bytes()).hexdigest() def _load_manifests(manifests_dir: Path) -> list[tuple[Path, dict]]: return [ (path, yaml.safe_load(path.read_text(encoding="utf-8"))) for path in sorted(manifests_dir.glob("*.yaml")) ] def _check_openable(path: Path) -> None: suffix = path.suffix.lower() if suffix == ".csv": path.read_text(encoding="utf-8") elif suffix == ".parquet": import pandas as pd try: pd.read_parquet(path) except ImportError: # Optional parquet engines (pyarrow/fastparquet) may be unavailable # in lightweight environments; existence/checksum validation still applies. return elif suffix in {".tif", ".tiff", ".png", ".jpg", ".jpeg"}: from PIL import Image with Image.open(path) as _img: pass def _is_valid_metadata_value(value: object) -> bool: if isinstance(value, (str, int, float, bool)) or value is None: return True if isinstance(value, list): return all(_is_valid_metadata_value(item) for item in value) if isinstance(value, dict): return all( isinstance(key, str) and _is_valid_metadata_value(item) for key, item in value.items() ) return False def _validate_custom_metadata(payload: dict, owner: str, issues: list[str]) -> None: if "custom_metadata" not in payload: return custom_metadata = payload["custom_metadata"] if not isinstance(custom_metadata, dict): issues.append(f"{owner}: custom_metadata must be an object") return if not _is_valid_metadata_value(custom_metadata): issues.append(f"{owner}: custom_metadata contains unsupported value types") def _validate_relationships( manifest: dict, manifest_name: str, issues: list[str] ) -> None: relationships = manifest.get("relationships") if relationships is None: return if not isinstance(relationships, list): issues.append(f"{manifest_name}: relationships must be a list") return file_paths: set[str] = set() for file_rec in manifest.get("files", []): if isinstance(file_rec, dict) and isinstance(file_rec.get("path"), str): file_paths.add(file_rec["path"]) for idx, rel in enumerate(relationships): owner = f"{manifest_name}:relationships[{idx}]" if not isinstance(rel, dict): issues.append(f"{owner}: relationship must be an object") continue source = rel.get("from") target = rel.get("to") rel_type = rel.get("type") if not isinstance(source, str) or not source.strip(): issues.append(f"{owner}: missing required 'from'") continue if not isinstance(target, str) or not target.strip(): issues.append(f"{owner}: missing required 'to'") continue if not isinstance(rel_type, str) or not rel_type.strip(): issues.append(f"{owner}: missing required 'type'") continue rocrate_predicate = rel.get("rocrate_predicate") if not isinstance(rocrate_predicate, str) or not rocrate_predicate.strip(): issues.append(f"{owner}: missing required 'rocrate_predicate'") continue if not rocrate_predicate.startswith(("http://", "https://")): issues.append(f"{owner}: rocrate_predicate must be an absolute URI") continue if source not in file_paths: issues.append(f"{owner}: unknown 'from' path: {source}") if target not in file_paths: issues.append(f"{owner}: unknown 'to' path: {target}")
[docs] def verify_datasets(manifests_dir: Path, data_dir: Path) -> VerifyResult: issues: list[str] = [] for manifest_path, manifest in _load_manifests(manifests_dir): missing = REQUIRED_FIELDS.difference(manifest.keys()) if missing: issues.append( f"{manifest_path.name}: missing required fields: {sorted(missing)}" ) continue source_identifier = str(manifest.get("source_identifier", "")).strip() if not source_identifier: issues.append( f"{manifest_path.name}: source_identifier must be a non-empty string" ) continue _validate_custom_metadata(manifest, manifest_path.name, issues) _validate_relationships(manifest, manifest_path.name, issues) source = manifest.get("source", {}) if isinstance(source, dict): _validate_custom_metadata(source, f"{manifest_path.name}:source", issues) for file_rec in manifest.get("files", []): if isinstance(file_rec, dict): _validate_custom_metadata( file_rec, f"{manifest_path.name}:file", issues ) kind = file_rec.get("kind", "file") rel_path = file_rec.get("path") if not rel_path: issues.append(f"{manifest_path.name}: file record missing path") continue target = data_dir / source_identifier / rel_path reported_path = f"{source_identifier}/{rel_path}" if kind == "directory": if ( not target.exists() or not target.is_dir() or not any(target.iterdir()) ): issues.append(f"{reported_path}: missing local directory content") continue if not target.exists(): issues.append(f"{reported_path}: missing local file") continue expected = (file_rec.get("sha256") or "").strip() if expected and _sha256(target) != expected: issues.append(f"{reported_path}: checksum mismatch") continue try: _check_openable(target) except Exception as exc: # noqa: BLE001 issues.append(f"{reported_path}: failed to open ({exc})") return VerifyResult(ok=not issues, issues=issues)
[docs] def main() -> int: parser = argparse.ArgumentParser(description="Verify OME-IRIS datasets") parser.add_argument("--manifests-dir", default="src/OME_IRIS/data/datasets") parser.add_argument("--data-dir", default="data") args = parser.parse_args() result = verify_datasets( manifests_dir=Path(args.manifests_dir), data_dir=Path(args.data_dir) ) if result.ok: print("Verification passed") return 0 print("Verification failed") for issue in result.issues: print(f"- {issue}") return 1
if __name__ == "__main__": raise SystemExit(main())