Source code for src.nviz.image

"""
Experiment with GFF image stacks to OME-ZARR with display in Napari.
"""

import os
import pathlib
from itertools import groupby
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import tifffile as tiff
import zarr
from ome_zarr.io import parse_url as zarr_parse_url
from ome_zarr.writer import write_image as zarr_write_image

from .image_meta import extract_z_slice_number_from_filename, generate_ome_xml


[docs] def image_set_to_arrays( image_dir: str, label_dir: Optional[str], channel_map: Dict[str, str], ignore: Optional[List[str]] = ["Merge"], ) -> Dict[str, Dict[str, np.ndarray]]: """ Read a set of images as an array of images. We follow a convention of splitting the following into separate nested dictionaries for use by other functions within this project. - "images": original images - "labels": images which represent objects of interest within the original "images" Args: image_dir (str): Directory containing TIFF image files. label_dir (Optional[str]): Directory containing label TIFF files. channel_map (Dict[str, str]): Mapping from filename codes to channel names. ignore (Optional[List[str]]): List of filename codes to ignore. Returns: Dict[str, Dict[str, np.ndarray]]: A dictionary containing two keys: "images" and "labels". Each key maps to another dictionary where the keys are channel names and the values are numpy arrays of images. """ # build a reference to the observations zstack_arrays = { "images": { channel_map.get(filename_code, f"Unknown_{filename_code}"): np.stack( [ tiff.imread(tiff_file.path).astype(np.uint16) for tiff_file in sorted( files, key=lambda x: extract_z_slice_number_from_filename(x.name), ) ] ).astype(np.uint16) for filename_code, files in groupby( sorted( [ file for file in os.scandir(image_dir) if (file.name.endswith(".tif") or file.name.endswith(".tiff")) and ( file.name.split("_")[1] not in ignore if ignore is not None else True ) ], key=lambda x: x.name.split("_")[1], ), key=lambda x: x.name.split("_")[1], ) } } if label_dir: zstack_arrays["labels"] = { f"{pathlib.Path(label_name).stem} (labels)": tiff.imread( next(iter(file)).path ).astype(np.uint16) for label_name, file in groupby( sorted( [ file for file in os.scandir(label_dir) if (file.name.endswith(".tif") or file.name.endswith(".tiff")) and ( file.name.split("_")[0] not in ignore if ignore is not None else True ) ], key=lambda x: x.name.split("_")[0], ), key=lambda x: x.name.split("_")[0], ) } return zstack_arrays
[docs] def tiff_to_zarr( # noqa: PLR0913 image_dir: str, label_dir: Optional[str], output_path: str, channel_map: Dict[str, str], scaling_values: Union[List[int], Tuple[int]], ignore: Optional[List[str]], ) -> str: """ Convert TIFF files to OME-Zarr format. Args: image_dir (str): Directory containing TIFF image files. label_dir (Optional[str]): Directory containing label TIFF files. output_path (str): Path to save the output OME-Zarr file. channel_map (Dict[str, str]): Mapping from filename codes to channel names. scaling_values (Union[List[int], Tuple[int]]): Scaling values for the images. ignore (Optional[List[str]]): List of filename codes to ignore. Returns: str: Path to the output OME-Zarr file. """ # except on dir already existing if pathlib.Path(output_path).is_dir(): raise FileExistsError( ( f"Output path {output_path} already exists." "Please remove before creating a new Zarr." ) ) if not pathlib.Path(image_dir).is_dir(): raise NotADirectoryError(f"Image directory {image_dir} does not exist.") # build a reference to the observations frame_zstacks = image_set_to_arrays( image_dir=image_dir, label_dir=label_dir, channel_map=channel_map, ignore=ignore ) # Parse URL and ensure store is compatible store = zarr_parse_url(output_path, mode="w").store # Ensure we are working with a Zarr group root = zarr.group(store, overwrite=True) # create scaling metadata scale_metadata = [ { "datasets": [ { "path": "0", # Path to the dataset "coordinateTransformations": [ { "type": "scale", "scale": list(scaling_values), } # Apply scaling values ], } ], "axes": [ { "name": "z", "unit": "micrometer", "type": "space", }, # Define the z-axis { "name": "y", "unit": "micrometer", "type": "space", }, # Define the y-axis { "name": "x", "unit": "micrometer", "type": "space", }, # Define the x-axis ], } ] # Write each channel separately to the Zarr file with no compression # Save images to OME-Zarr format images_group = root.create_group("images") for channel, stack in frame_zstacks["images"].items(): zarr_write_image( image=stack, group=(group := images_group.create_group(channel)), axes="zyx", # Specify the axes order for each channel dtype="uint16", # Ensure the dtype is set correctly scaler=None, # Disable scaler ) # Set the units attribute for the group to "micrometers" group.attrs["units"] = "micrometers" # Define the multiscales metadata for the group group.attrs["multiscales"] = scale_metadata if label_dir: # Save masks to OME-Zarr format labels_group = root.create_group("labels") for compartment_name, stack in frame_zstacks["labels"].items(): zarr_write_image( image=stack, group=(group := labels_group.create_group(compartment_name)), axes="zyx", # Specify the axes order for each mask dtype="uint16", # Ensure the dtype is set correctly scaler=None, # Disable scaler ) # Set the units attribute for the group to "micrometers" group.attrs["units"] = "micrometers" # Define the multiscales metadata for the group group.attrs["multiscales"] = scale_metadata return output_path
[docs] def tiff_to_ometiff( # noqa: PLR0913 image_dir: str, label_dir: Optional[str], output_path: str, channel_map: Dict[str, str], scaling_values: Union[List[int], Tuple[int]], ignore: Optional[List[str]], ) -> str: """ Convert TIFF files to OME-TIFF format. Args: image_dir (str): Directory containing TIFF image files. label_dir (Optional[str]): Directory containing label TIFF files. output_path (str): Path to save the output OME-TIFF file. channel_map (Dict[str, str]): Mapping from filename codes to channel names. scaling_values (Union[List[int], Tuple[int]]): Scaling values for the images. ignore (Optional[List[str]]): List of filename codes to ignore. Returns: str: Path to the output OME-TIFF file. """ # except on dir already existing if pathlib.Path(output_path).is_file(): raise FileExistsError( ( f"Output path {output_path} already exists." "Please remove before creating a new OME-TIFF." ) ) if not pathlib.Path(image_dir).is_dir(): raise NotADirectoryError(f"Image directory {image_dir} does not exist.") frame_zstacks = image_set_to_arrays( image_dir=image_dir, label_dir=label_dir, channel_map=channel_map, ignore=ignore ) # Prepare the data for writing images_data = [] labels_data = [] channel_names = [] label_names = [] # Collect image data for channel, stack in frame_zstacks["images"].items(): images_data.append(stack) channel_names.append(channel) # Collect label data if label_dir: for compartment_name, stack in frame_zstacks["labels"].items(): labels_data.append(stack) label_names.append(f"{compartment_name} (labels)") # Stack the images and labels along a new axis for channels images_data = np.stack(images_data, axis=0) if labels_data: labels_data = np.stack(labels_data, axis=0) combined_data = np.concatenate((images_data, labels_data), axis=0) combined_channel_names = channel_names + label_names else: combined_data = images_data combined_channel_names = channel_names # Generate OME-XML metadata ome_metadata = { "SizeC": combined_data.shape[0], "SizeZ": combined_data.shape[1], "SizeY": combined_data.shape[2], "SizeX": combined_data.shape[3], "PhysicalSizeX": scaling_values[2], "PhysicalSizeY": scaling_values[1], "PhysicalSizeZ": scaling_values[0], # note: we use 7-bit ascii compatible characters below # due to tifffile limitations "PhysicalSizeXUnit": "um", "PhysicalSizeYUnit": "um", "PhysicalSizeZUnit": "um", "Channel": [{"Name": name} for name in combined_channel_names], } ome_xml = generate_ome_xml(ome_metadata) # Write the combined data to a single OME-TIFF with tiff.TiffWriter(output_path, bigtiff=True) as tif: tif.write(combined_data, description=ome_xml, photometric="minisblack") return output_path