Dask Tutorial

Overview

  • teaching: 20 minutes

  • exercises: 0

  • questions:

    • How does Dask parallelize computations in Python?

Table of contents

  1. Dask primer

  2. Dask clusters

  3. Dask dataframe

  4. Dask arrays

  5. Dask delayed

Dask Primer

Dask logo

Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like dask.array and dask.dataframe. More on dask here: https://docs.dask.org/en/latest/

Note: Pieces of this notebook comes from the following sources:

Dask Clusters

Dask needs a collection of computing resources in order to perform parallel computations. Dask Clusters have different names corresponding to different computing environments (for example, LocalCluster for your Laptop, PBSCluster for your HPC, or Kubernetes Cluster for machines on the Cloud). Each cluster has a certain number of computing resources called ‘Workers’, that each get allocated CPU and RAM. The dask scheduling system maps jobs to each worker on a cluster for you, so the syntax is mostly the same once you initialize a cluster!

[1]:
# Let's start simple with a LocalCluster that makes use of all the cores and RAM we have on a single machine
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
# explicitly connect to the cluster we just created
client = Client(cluster)
client
[1]:

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 8.59 GB

Dask Dataframe

If you are working with a very large Pandas dataframe, you can consider parallizing computations by turning it into a Dask Dataframe. Dask Dataframes split a dataframe into partitions along an index. They support a large subset of the Pandas API. You can find additional details and examples here https://examples.dask.org/dataframe.html

[2]:
# Although this is small csv file, we'll reuse our same example from before!
# Load csv results from server into a Pandas DataFrame
import dask.dataframe as dd
server = 'https://webservices.volcano.si.edu/geoserver/GVP-VOTW/ows?'
query = 'service=WFS&version=2.0.0&request=GetFeature&typeName=GVP-VOTW:Smithsonian_VOTW_Holocene_Volcanoes&outputFormat=csv'

# blocksize=None means use a single partion
df = dd.read_csv(server+query, blocksize=None)
[3]:
# We only see the metadata, the actual data are only computed when requested.
df
[3]:
Dask DataFrame Structure:
FID Volcano_Number Volcano_Name Primary_Volcano_Type Last_Eruption_Year Country Geological_Summary Region Subregion Latitude Longitude Elevation Tectonic_Setting Geologic_Epoch Evidence_Category Primary_Photo_Link Primary_Photo_Caption Primary_Photo_Credit Major_Rock_Type GeoLocation
npartitions=1
object int64 object object float64 object object object object float64 float64 int64 object object object object object object object object
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 3 tasks
[4]:
# We can break up the table into 4 partions to map out to each core:
df = df.repartition(npartitions=4)
df
[4]:
Dask DataFrame Structure:
FID Volcano_Number Volcano_Name Primary_Volcano_Type Last_Eruption_Year Country Geological_Summary Region Subregion Latitude Longitude Elevation Tectonic_Setting Geologic_Epoch Evidence_Category Primary_Photo_Link Primary_Photo_Caption Primary_Photo_Credit Major_Rock_Type GeoLocation
npartitions=4
object int64 object object float64 object object object object float64 float64 int64 object object object object object object object object
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: repartition, 8 tasks
[5]:
# Let's say we want to know the minimum last eruption year for all volcanoes
last_eruption_year_min = df.Last_Eruption_Year.min()
last_eruption_year_min
[5]:
dd.Scalar<series-..., dtype=float64>
[6]:
# Instead of getting the actual value we see dd.Scalar, which represents a recipe for actually calculating this value
last_eruption_year_min.visualize(format='svg')
[6]:
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_10_0.svg
[7]:
# To get the value call the 'compute method'
# NOTE: this was slower than using pandas directly,,, for small data you often don't need to use parallel computing!
last_eruption_year_min.compute()
[7]:
-10450.0

Dask Arrays

A dask array looks and feels a lot like a numpy array. However, a dask array doesn’t directly hold any data. Instead, it symbolically represents the computations needed to generate the data. Nothing is actually computed until the actual numerical values are needed. This mode of operation is called “lazy”; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.

If we want to create a numpy array of all ones, we do it like this:

[8]:
import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
[8]:
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

This array contains exactly 32 MB of data:

[9]:
print('%.1f MB' % (ones_np.nbytes / 1e6))
32.0 MB

Now let’s create the same array using dask’s array interface.

[10]:
import dask.array as da
ones = da.ones(shape)
ones
[10]:
Array Chunk
Bytes 32.00 MB 32.00 MB
Shape (1000, 4000) (1000, 4000)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
4000 1000

This works, but we didn’t tell dask how to split up the array, so it is not optimized for distributed computation.

A crucal difference with dask is that we must specify the chunks argument. “Chunks” describes how the array is split up over many sub-arrays.

Dask Arrays source:Dask Array Documentation

There are several ways to specify chunks. In this lecture, we will use a block shape.

[11]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
[11]:
Array Chunk
Bytes 32.00 MB 8.00 MB
Shape (1000, 4000) (1000, 1000)
Count 4 Tasks 4 Chunks
Type float64 numpy.ndarray
4000 1000

Notice that we just see a symbolic representation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a dask array, the computation is trigger and the dask array becomes a numpy array.

[12]:
ones.compute()
[12]:
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

In order to understand what happened when we called .compute(), we can visualize the dask graph, the symbolic operations that make up the array

[13]:
ones.visualize(format='svg')
[13]:
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_23_0.svg

Our array has four chunks. To generate it, dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:

[14]:
sum_of_ones = ones.sum()
sum_of_ones.visualize(format='svg')
[14]:
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_25_0.svg

Here we see dask’s strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data.

If we make our operation more complex, the graph gets more complex.

[15]:
fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize(format='svg')
[15]:
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_27_0.svg

A Bigger Calculation

The examples above were toy examples; the data (32 MB) is nowhere nearly big enough to warrant the use of dask.

We can make it a lot bigger!

[16]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones
[16]:
Array Chunk
Bytes 6.40 GB 8.00 MB
Shape (200000, 4000) (1000, 1000)
Count 800 Tasks 800 Chunks
Type float64 numpy.ndarray
4000 200000
[17]:
print('%.1f MB' % (big_ones.nbytes / 1e6))
6400.0 MB

This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it.

Do not try to ``.visualize()`` this array!

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let’s watch the dashboard again as we do a bigger computation.

[18]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result
[18]:
1.0

Reduction

All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.

[19]:
big_ones_reduce = (np.cos(big_ones)**2).mean(axis=1)
big_ones_reduce
[19]:
Array Chunk
Bytes 1.60 MB 8.00 kB
Shape (200000,) (1000,)
Count 3400 Tasks 200 Chunks
Type float64 numpy.ndarray
200000 1

Plotting also triggers computation, since we need the actual values

[20]:
from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (12,8)
[21]:
plt.plot(big_ones_reduce)
[21]:
[<matplotlib.lines.Line2D at 0x7f9d71200a90>]
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_37_1.png

Dask Delayed

Dask.delayed is a simple and powerful way to parallelize existing code. It allows users to delay function calls into a task graph with dependencies. Dask.delayed doesn’t provide any fancy parallel algorithms like Dask.dataframe, but it does give the user complete control over what they want to build.

Systems like Dask.dataframe are built with Dask.delayed. If you have a problem that is paralellizable, but isn’t as simple as just a big array or a big dataframe, then dask.delayed may be the right choice for you.

Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

[22]:
import time

def inc(x):
    time.sleep(0.1)
    return x + 1

def dec(x):
    time.sleep(0.1)
    return x - 1

def add(x, y):
    time.sleep(0.2)
    return x + y

We can run them like normal Python functions below

[23]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z
CPU times: user 14.7 ms, sys: 582 µs, total: 15.3 ms
Wall time: 401 ms
[23]:
3

These ran one after the other, in sequence. Note though that the first two lines inc(1) and dec(2) don’t depend on each other, we could have called them in parallel had we been clever.

Annotate functions with Dask Delayed to make them lazy

We can call dask.delayed on our funtions to make them lazy. Rather than compute their results immediately, they record what we want to compute as a task into a graph that we’ll run later on parallel hardware.

[24]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

Calling these lazy functions is now almost free. We’re just constructing a graph

[25]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z
CPU times: user 2.17 ms, sys: 0 ns, total: 2.17 ms
Wall time: 1.69 ms
[25]:
Delayed('add-be0ee523-1a89-45fe-9160-6a987b863f6c')

Visualize computation

[26]:
z.visualize(format='svg', rankdir='LR')
[26]:
../../../_images/repos_pangeo-data_pangeo-tutorial-gallery_dask_47_0.svg

Run in parallel

Call .compute() when you want your result as a normal Python object

If you started Client() above then you may want to watch the status page during computation.

[27]:
%%time
z.compute()
CPU times: user 31.7 ms, sys: 4.14 ms, total: 35.8 ms
Wall time: 330 ms
[27]:
3

Parallelize Normal Python code

Now we use Dask in normal for-loopy Python code. This generates graphs instead of doing computations directly, but still looks like the code we had before. Dask is a convenient way to add parallelism to existing workflows.

[28]:
%%time
zs = []
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

zs = dask.persist(*zs)  # trigger computation in the background
CPU times: user 67.2 ms, sys: 16.6 ms, total: 83.7 ms
Wall time: 79.7 ms