Practical 1: Dask basics#

Delaying function execution using dask delayed#

The core function of dask consists in organizing how and when computations are performed.

One of its core functionalities consists in delaying the execution of aribtrary python functions until the result is required.

How does dask achieve this in practice?

Consider a function F that performs a computation, i.e. takes an input and returns the result.

def add(x1, x2):
    return x1 + x2

# returns 3
add(1, 2)

To gain control over how and when this function call is executed, dask converts the function into another function which takes the same inputs, and returns an object that represents the computation performed by the function. For this, it uses the functionality contained in dask.delayed.

from dask import delayed

delayed_add = delayed(add)
delayed_add

When we call this function with it an input, no computation is performed yet and an object is returned that encodes the funtion and its input.

delayed_result = delayed_add(1, 2)
delayed_result

When .compute() or dask.compute(delayed_result) is called, the execution of the function is triggered.

a = delayed_result.compute()
print(a)

Calling delayed functions can be chained and combined.#

# f(x, y) = x^2 + y^2

def add(x1, x2):
    return x1 + x2

def square(x):
    return x ** 2

delayed_square = delayed(square)
delayed_add = delayed(add)

x, y = 2, 3
result = add(square(x), square(y))
delayed_result = delayed_add(delayed_square(x), delayed_square(y))

delayed_result, delayed_result.compute()

Visualizing the computations performed within a delayed object#

delayed_result.visualize(optimize_graph=False, engine='cytoscape')

More details and examples: https://tutorial.dask.org/03_dask.delayed.html

Dask arrays#

A dask array is an object that represents many tiled numpy arrays.#

Just like a delayed object, a dask array also represents a delayed computation. More specifically, it represents one delayed computation for each tile array that it contains. It’s therefore a content aware delayed object.

The tiled arrays composing a dask array are referred to as “chunks”, and typically have the same shape (but can also vary).

Dask arrays can be created in different ways, e.g.

  • with array creation functions that match those available in numpy.

  • by tiling an existing numpy array

  • by receiving as inputs delayed objects.

Here we create a dask array containing ones in the same way we’d do with np.ones. In addition to the shape, we can indicate the size of the individual chunks for the array to be created.

In notebooks, dask arrays are represented using schematics that indicate

  • their total shape

  • the shape of their chunks

  • how many computations they depend on

  • their data type.

# imports

import numpy as np
import zarr
import dask
import dask.array as da
im_da = da.ones((10000, 10000), chunks=(1000, 1000))
im_da

Dask arrays are intended to be a natural extension of numpy arrays and therefore behave very similarly.

For example, dask arrays can be

  • indexed and sliced

  • used to perform manipulations and computations

# Slicing a dask array

im_da[:10, 0]

Performing calculations with dask arrays

  • the results of computations are again dask arrays

  • most numpy functions are supported

im_da_result = np.sin(im_da[:, 0]) * im_da.T
im_da_result

Dask arrays are lazy.#

This is an important aspect of dask arrays and means that the data (contained in the chunks) is not loaded into memory until it is needed. This is in contrast to numpy arrays, which are “eager”, meaning that the data is loaded into memory when the array is created.

# this gives us the schematic representation of the array, but no data is loaded yet

im_da_result[:10, :10]

Calling .compute() on the dask array will

  • convert the dask array to a numpy array

  • perform the calculations defined in the dask array.

# this gives a numpy array as a result

im_da_result[:5, :5].compute()

Computations with dask arrays are encoded in dask graphs.#

Just like delayed objects, computations in dask arrays are represented by graphs. In this case, the graph encodes for each chunk which functions need to be applied to which data in order to compute it.

# Let's create a dask array and perform a computation on it.

# This could be a time lapse of images (TXY), for example.
im_da = da.ones((3, 1000, 1000), chunks=(1, 600, 600))

# Let's calculate the max intensity
im_da_max = im_da.max(axis=(1, 2))
# im_da_max = im_da.max()

# Let's normalize the images by the max intensities
im_da_normalized = im_da / im_da_max[:, None, None]
# im_da_normalized = im_da / im_da_max#[:, None, None]

im_da_normalized
# We can visualize the computation graph

dask.visualize(im_da_normalized, optimize_graph=False)

Schedulers#

As we’ve seen, dask objects generate task graphs where each node in the graph is a Python function and edges between nodes are objects that are created by one task as outputs and used as inputs in another task. After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a task scheduler. Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics.

Dask has two types of task schedulers:

Single-machine scheduler: This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use, although it can only be used on a single machine and does not scale.

Distributed scheduler: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster.

dask-overview.svg

# single-machine scheduler:

da.ones((10, 10)).compute(scheduler='single-threaded')
da.ones((10, 10)).compute(scheduler='threads') # default
da.ones((10, 10)).compute(scheduler='processes')
# distributed scheduler

from dask.distributed import LocalCluster, Client

local_cluster = LocalCluster(
    n_workers=1,
    threads_per_worker=8,
)
client = Client(local_cluster)
client
# Execute this line and follow the computation progress in the dask dashboard (link in cell output above)

da.ones([10000] * 3, chunks=[2000] * 3).T.sum().compute()

How is this useful?#

Example: working with zarr files / images#

Zarr is a file format that can read and write arrays / images in chunks. With its associated python library, data is accessed on demand and only the required part of the images is read or written.

Let’s create a large(ish) example zarr image file:

# create a large example image file

filename = 'data/large_example_image.zarr'

# initialise a zarr file
im_zarr = zarr.open(filename, mode='w', shape=(10000, 10000),
               chunks=(1000, 1000), dtype='i4')

# write random data to it
im_zarr[:,:] = np.random.randint(0, 100, (im_zarr.shape))

Accessing the image and processing it#

# the data is available as a zarr array

im_zarr * 2
# to compute things we can convert the zarr array into a numpy array

im_np = np.array(im_zarr) # or `im_zarr[:]`

im_np * 2
# or a dask array using the dask.array.from_zarr function

im_da = da.from_zarr(im_zarr)

im_da * 2
# to obtain computation results we need to call im_da.compute()
(im_da * 2).compute()

How do the numpy and dask arrays compare in terms of timing?#

%%timeit -r 1

# Perform an operation using the zarr array

(np.array(im_zarr) ** 2).sum()
%%timeit -r 1

# Perform the same operation using a dask array defined from the zarr array

(im_da ** 2).sum().compute()
%%timeit -r 1

# Accessing an element of the zarr array

im_zarr[0,0]
%%time

# Accessing an element of the dask array

im_da[0,0].compute() # to obtain actual values we again need to .compute()
%%timeit

# Perform the operation first and access the element later
# zarr array

(im_np * 10)[0,0]
%%timeit

# Perform the operation first and access the element later
# dask array

(im_da * 10)[0,0].compute()

How to determine chunk size?#

Too large:

  • more memory requirements

  • not enough parallelism for computation

Too small:

  • too much organizational overload for “scheduler”

Optimum:

  • scheduler needs ~1 ms for coordinating the processing of a chunk, therefore expected computation time should be larger than that

  • align with the source of the data, e.g. zarr chunks

Dependence of computation time on chunk size#

# large chunk sizes
np.random.seed(0)
im = da.random.randint(0, 100, [500] * 3, chunks=500)
im
%%timeit -r 1
im.sum().compute()
# small chunk sizes
np.random.seed(0)
im = da.random.randint(0, 100, [500] * 3, chunks=50)
im
%%timeit -r 1
im.sum().compute()

Excercise: Find a better chunk size#

# large chunk sizes
np.random.seed(0)
im = da.random.randint(0, 100, [500] * 3, chunks=200)
im
%%timeit -r 1
im.sum().compute()