## Overview¶

• teaching: 20 minutes

• exercises: 0

• questions:

• How does Dask parallelize computations in Python?

Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like dask.array and dask.dataframe. More on dask here: https://docs.dask.org/en/latest/

Note: Pieces of this notebook comes from the following sources:

Dask needs a collection of computing resources in order to perform parallel computations. Dask Clusters have different names corresponding to different computing environments (for example, LocalCluster for your Laptop, PBSCluster for your HPC, or Kubernetes Cluster for machines on the Cloud). Each cluster has a certain number of computing resources called ‘Workers’, that each get allocated CPU and RAM. The dask scheduling system maps jobs to each worker on a cluster for you, so the syntax is mostly the same once you initialize a cluster!

[1]:

# Let's start simple with a LocalCluster that makes use of all the cores and RAM we have on a single machine
cluster = LocalCluster()
# explicitly connect to the cluster we just created
client = Client(cluster)
client

[1]:


### Cluster

• Workers: 2
• Cores: 2
• Memory: 8.59 GB

If you are working with a very large Pandas dataframe, you can consider parallizing computations by turning it into a Dask Dataframe. Dask Dataframes split a dataframe into partitions along an index. They support a large subset of the Pandas API. You can find additional details and examples here https://examples.dask.org/dataframe.html

[2]:

# Although this is small csv file, we'll reuse our same example from before!
# Load csv results from server into a Pandas DataFrame
server = 'https://webservices.volcano.si.edu/geoserver/GVP-VOTW/ows?'
query = 'service=WFS&version=2.0.0&request=GetFeature&typeName=GVP-VOTW:Smithsonian_VOTW_Holocene_Volcanoes&outputFormat=csv'

# blocksize=None means use a single partion

[3]:

# We only see the metadata, the actual data are only computed when requested.
df

[3]:

FID Volcano_Number Volcano_Name Primary_Volcano_Type Last_Eruption_Year Country Geological_Summary Region Subregion Latitude Longitude Elevation Tectonic_Setting Geologic_Epoch Evidence_Category Primary_Photo_Link Primary_Photo_Caption Primary_Photo_Credit Major_Rock_Type GeoLocation
npartitions=1
object int64 object object float64 object object object object float64 float64 int64 object object object object object object object object
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
[4]:

# We can break up the table into 4 partions to map out to each core:
df = df.repartition(npartitions=4)
df

[4]:

FID Volcano_Number Volcano_Name Primary_Volcano_Type Last_Eruption_Year Country Geological_Summary Region Subregion Latitude Longitude Elevation Tectonic_Setting Geologic_Epoch Evidence_Category Primary_Photo_Link Primary_Photo_Caption Primary_Photo_Credit Major_Rock_Type GeoLocation
npartitions=4
object int64 object object float64 object object object object float64 float64 int64 object object object object object object object object
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
[5]:

# Let's say we want to know the minimum last eruption year for all volcanoes
last_eruption_year_min = df.Last_Eruption_Year.min()
last_eruption_year_min

[5]:

dd.Scalar<series-..., dtype=float64>

[6]:

# Instead of getting the actual value we see dd.Scalar, which represents a recipe for actually calculating this value
last_eruption_year_min.visualize(format='svg')

[6]:

[7]:

# To get the value call the 'compute method'
# NOTE: this was slower than using pandas directly,,, for small data you often don't need to use parallel computing!
last_eruption_year_min.compute()

[7]:

-10450.0


A dask array looks and feels a lot like a numpy array. However, a dask array doesn’t directly hold any data. Instead, it symbolically represents the computations needed to generate the data. Nothing is actually computed until the actual numerical values are needed. This mode of operation is called “lazy”; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.

If we want to create a numpy array of all ones, we do it like this:

[8]:

import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

[8]:

array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])


This array contains exactly 32 MB of data:

[9]:

print('%.1f MB' % (ones_np.nbytes / 1e6))

32.0 MB


Now let’s create the same array using dask’s array interface.

[10]:

import dask.array as da
ones = da.ones(shape)
ones

[10]:

 Array Chunk 32.00 MB 32.00 MB (1000, 4000) (1000, 4000) 1 Tasks 1 Chunks float64 numpy.ndarray

This works, but we didn’t tell dask how to split up the array, so it is not optimized for distributed computation.

A crucal difference with dask is that we must specify the chunks argument. “Chunks” describes how the array is split up over many sub-arrays.

source: Dask Array Documentation <http://dask.pydata.org/en/latest/array-overview.html>__

There are several ways to specify chunks. In this lecture, we will use a block shape.

[11]:

chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

[11]:

 Array Chunk 32.00 MB 8.00 MB (1000, 4000) (1000, 1000) 4 Tasks 4 Chunks float64 numpy.ndarray

Notice that we just see a symbolic representation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a dask array, the computation is trigger and the dask array becomes a numpy array.

[12]:

ones.compute()

[12]:

array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])


In order to understand what happened when we called .compute(), we can visualize the dask graph, the symbolic operations that make up the array

[13]:

ones.visualize(format='svg')

[13]:


Our array has four chunks. To generate it, dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:

[14]:

sum_of_ones = ones.sum()
sum_of_ones.visualize(format='svg')

[14]:


Here we see dask’s strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data.

If we make our operation more complex, the graph gets more complex.

[15]:

fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize(format='svg')

[15]:


## A Bigger Calculation¶

The examples above were toy examples; the data (32 MB) is nowhere nearly big enough to warrant the use of dask.

We can make it a lot bigger!

[16]:

bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

[16]:

 Array Chunk 6.40 GB 8.00 MB (200000, 4000) (1000, 1000) 800 Tasks 800 Chunks float64 numpy.ndarray
[17]:

print('%.1f MB' % (big_ones.nbytes / 1e6))

6400.0 MB


This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it.

Do not try to .visualize() this array!

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let’s watch the dashboard again as we do a bigger computation.

[18]:

big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result

[18]:

1.0


## Reduction¶

All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.

[19]:

big_ones_reduce = (np.cos(big_ones)**2).mean(axis=1)
big_ones_reduce

[19]:

 Array Chunk 1.60 MB 8.00 kB (200000,) (1000,) 3400 Tasks 200 Chunks float64 numpy.ndarray

Plotting also triggers computation, since we need the actual values

[20]:

from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (12,8)

[21]:

plt.plot(big_ones_reduce)

[21]:

[<matplotlib.lines.Line2D at 0x7f9d71200a90>]


Dask.delayed is a simple and powerful way to parallelize existing code. It allows users to delay function calls into a task graph with dependencies. Dask.delayed doesn’t provide any fancy parallel algorithms like Dask.dataframe, but it does give the user complete control over what they want to build.

Systems like Dask.dataframe are built with Dask.delayed. If you have a problem that is paralellizable, but isn’t as simple as just a big array or a big dataframe, then dask.delayed may be the right choice for you.

### Create simple functions¶

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

[22]:

import time

def inc(x):
time.sleep(0.1)
return x + 1

def dec(x):
time.sleep(0.1)
return x - 1

time.sleep(0.2)
return x + y


We can run them like normal Python functions below

[23]:

%%time
x = inc(1)
y = dec(2)
z

CPU times: user 14.7 ms, sys: 582 µs, total: 15.3 ms
Wall time: 401 ms

[23]:

3


These ran one after the other, in sequence. Note though that the first two lines inc(1) and dec(2) don’t depend on each other, we could have called them in parallel had we been clever.

### Annotate functions with Dask Delayed to make them lazy¶

We can call dask.delayed on our funtions to make them lazy. Rather than compute their results immediately, they record what we want to compute as a task into a graph that we’ll run later on parallel hardware.

[24]:

import dask


Calling these lazy functions is now almost free. We’re just constructing a graph

[25]:

%%time
x = inc(1)
y = dec(2)
z

CPU times: user 2.17 ms, sys: 0 ns, total: 2.17 ms
Wall time: 1.69 ms

[25]:

Delayed('add-be0ee523-1a89-45fe-9160-6a987b863f6c')


### Visualize computation¶

[26]:

z.visualize(format='svg', rankdir='LR')

[26]:


### Run in parallel¶

Call .compute() when you want your result as a normal Python object

If you started Client() above then you may want to watch the status page during computation.

[27]:

%%time
z.compute()

CPU times: user 31.7 ms, sys: 4.14 ms, total: 35.8 ms
Wall time: 330 ms

[27]:

3


### Parallelize Normal Python code¶

Now we use Dask in normal for-loopy Python code. This generates graphs instead of doing computations directly, but still looks like the code we had before. Dask is a convenient way to add parallelism to existing workflows.

[28]:

%%time
zs = []
for i in range(256):
x = inc(i)
y = dec(x)

CPU times: user 67.2 ms, sys: 16.6 ms, total: 83.7 ms