out-of-core image analysis with dask

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
%gui qt

dask.distributed

A first life hack: in general, avoid using bare dask, and instead create a dask.distributed Client as in the cell below. What this buys you:

  • memory management: the distributed scheduler that is automatically created with the default client will launch processes with maximum memory limits. If they exceed those limits, the scheduler will stop sending tasks to them and eventually kill them. In contrast, without distributed, you are subject to the same issues as bare Python. It is very easy to freeze your machine.

  • a diagnostics dashboard. This can be invaluable in helping to understand performance in your application. We’ll see a live example below.

  • seamless scaling. Whether the scheduler is using local workers or connected to your institution’s HPC, or cloud compute, the API is the same — you just change the scheduler and connect the Client to it.

from dask import distributed
client = distributed.Client()
print(client.dashboard_link)
http://127.0.0.1:8787/status
import numpy as np

random_image = np.random.random((512, 512))

import napari

napari.view_image(random_image)
<napari.viewer.Viewer at 0x7f9ce3e93fd0>
import dask.array as da

# yes we know Janelia has workstations with TB of RAM
# We ignore them for variable naming purposes. =P
impossible_image = da.random.random(
    (40_000, 2_000, 2_000),
    chunks=(1, 1_000, 1_000),
)

print(impossible_image.nbytes / 1e9)
1280.0
napari.view_image(impossible_image)
<napari.viewer.Viewer at 0x7f9c89c2d3d0>

Now with real images

We use data from the Cell Tracking Challenge, specifically:

from dask_image.imread import imread
ROOT_PATH = '/Users/jni/data/Fluo-N3DH-CE/'
embryo = imread(ROOT_PATH + '01/t*.tif')
type(embryo)
dask.array.core.Array
embryo.shape
(250, 35, 512, 708)
embryo.dtype
dtype('uint8')
embryo.nbytes / 1e9
3.17184
embryo.chunksize
(1, 35, 512, 708)

Note: below, you might want to subsample the embryo with embryo[:, :, ::2, ::2] (correspondingly adjusting the scale), if you run into this issue with ghosting.

viewer = napari.view_image(
    embryo,
    scale=[10, 1, 1],
)

Exercise: file formats

Open the dask dashboard, and view it while changing timepoints in napari. How long does loading a tiff file take?

If you have enough room in your drive, the zarr format offers much faster read from disk than tiff, especially for segmentations, which have very effective compression.

Use dask.Array.to_zarr to save to a zarr file, and reload the array with dask.array.from_zarr. Swap out the image layer for the zarr-based one. How long does the load take now?

You might want to check out the NGFF tutorial, too!

Adding the tracking data

Now, let’s view the tracking data. The track format is described in this pdf. You can also see a description of the below workflow without dask (ie it must fit in your RAM) at this napari documentation page.

The tracklets are actually individually-labelled pixels within a volume like the original image. napari prefers to display tracks directly from coordinates, so we will use dask to convert from one to the other.

We are lucky: the images can be processed one at a time (which dask is perfect for), and the compressed data (just point coordinates) are much, much smaller — easy to fit in RAM. We take advantage of this in the below workflow.

tracklet_images = imread(ROOT_PATH + '01_GT/TRA/man_track*.tif')

First, we define a function that will work on an individual volume, together with that volume’s index (ie the timepoint).

from skimage.measure import regionprops_table
import pandas as pd


def image_to_tracklets(volume, idx):
    props_dict = regionprops_table(
        np.asarray(volume), properties=('label', 'centroid')
    )
    props_df = pd.DataFrame(props_dict)
    props_df['frame'] = idx
    return props_df[
        ['label', 'frame', 'centroid-0', 'centroid-1', 'centroid-2']
    ]

Now we can run that function on the whole volume using the Client.map API. Futures are little IOUs for computation: a Future may or may not contain the result of the computation. Calling future.result() on a Future object causes Python to wait for that result to be ready. Otherwise, creating a Future is more or less instantaneous.

We will see later that futures have a .cancel() method — useful when you trigger a lot of computation but realise you want to stop it!

futures = client.map(
    image_to_tracklets,
    tracklet_images,
    np.arange(len(tracklet_images)),
)
all_tables = [f.result() for f in futures]
tracklets = (
    pd.concat(all_tables)
    .reset_index(drop=True)
    .sort_values(['label', 'frame'])
)
tracklets_layer = viewer.add_tracks(tracklets, scale=[1, 10, 1, 1])

We want to also load the lineage information, which is presented in a table called man_track.txt, containing the following four columns, called LBEP:

  • L - a unique label of the track (label of markers, 16-bit positive value)

  • B - a zero-based temporal index of the frame in which the track begins

  • E - a zero-based temporal index of the frame in which the track ends

  • P - label of the parent track (0 is used when no parent is defined)

lbep = np.loadtxt(
    ROOT_PATH + '01_GT/TRA/man_track.txt',
    dtype=np.uint,
)
full_graph = dict(lbep[:, [0, 3]])
graph = {k: v for k, v in full_graph.items() if v != 0}
tracks_layer = viewer.add_tracks(tracklets, graph=graph, scale=[1, 10, 1, 1])

Challenge

Our final goal will be to compute a segmentation from the grayscale image together with the points in the tracks. Just like last time, we will use smoothed and thresholded nuclei as a mask, and we will use the track points (conveniently already in marker image format!) to run watershed on each.

We can use the dask-image library, which contains many functions adapted from scipy.ndimage, to do the smoothing:

from dask_image import ndfilters

smoothed = ndfilters.gaussian_filter(
    embryo,
    sigma=[0, 1, 10, 10],
)

smoothed_layer = viewer.add_image(
    smoothed,
    scale=[10, 1, 1],
)

And we can use dask.array.map_blocks to find the edges of the nuclei, just like in the previous notebook:

from skimage import filters


edges = da.map_blocks(filters.scharr, smoothed)

edges_layer = viewer.add_image(edges, scale=[10, 1, 1])

Final challenge: distributed segmentation with dask

  • Find threshold values for each timepoint of the smoothed data using client.map and a scikit-image thresholding function from skimage.filters. Create an array of the thresholding values

  • Using NumPy broadcasting, produce a Dask array containing the thresholded smooth values. Add this array to napari.

  • (Optionally) use da.map_blocks with a custom filter function to find better boundaries of the nuclei. Add this array to napari.

  • Use da.map_blocks together with skimage.segmentation.watershed and the three previous arrays to create the output segmentation. Add this array as labels to napari.

  • Navigate the volume by clicking on the slider, and monitor the Dask dashboard.