Using Dask Gateway

This cluster has been configured to use Dask-Gateway. This notebook demonstrates how to use it.

Creating a Cluster

We’ve done most of the configuration for you. All that remains for you is to either create a new cluster or connect to an existing one. You’ll do this with a dask_gateway.Gateway object.

If the default settings are appropriate, you can create a new cluster without specifying any additional information.

[1]:
from dask_gateway import Gateway
gateway = Gateway()
[2]:
gateway.list_clusters()
[2]:
[]
[3]:
cluster = gateway.new_cluster()
cluster

gateway.new_cluster() created a new Dask cluster. You can pass this cluster to a dask.distributed.Client to create a client so that any computations using Dask will be executed on the cluster.

[4]:
from distributed import Client

client = Client(cluster)
client
[4]:

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B
[5]:
import dask.array as da

arr = da.random.random((1000, 1000), chunks=100).persist()

By default, the cluster doesn’t have any workers. You’ll need to either set your cluster to adaptive mode or scale manually.

In adaptive mode, the cluster will automatically resize itself based on the workload.

[6]:
cluster.adapt()
[7]:
arr.compute()
[7]:
array([[0.1431792 , 0.93820099, 0.66730453, ..., 0.03956389, 0.2044932 ,
        0.18304753],
       [0.64694806, 0.49555045, 0.83191087, ..., 0.41496721, 0.70523034,
        0.6904073 ],
       [0.71098109, 0.14479997, 0.57938282, ..., 0.05745631, 0.45435815,
        0.94684564],
       ...,
       [0.24220808, 0.41938078, 0.63138765, ..., 0.50079068, 0.61568748,
        0.56408695],
       [0.64825169, 0.87362234, 0.57633359, ..., 0.69878213, 0.27189681,
        0.74026788],
       [0.55767535, 0.30320157, 0.69172876, ..., 0.84584712, 0.28080072,
        0.88013722]])

The arr.compute call may take more or less time, depending on the state of the Kubernetes cluster. If the cluster has been idle recently, we may be starting up additional machines in the background to do your work.

Subsequent computations, now that those machines are around, will be much faster.

[8]:
client.close()
cluster.close()

Specifying options

We’ve configured things like the the number of cores per worker to be appropriate for the most common use cases. For additional control over the cluster, use the gateway.cluster_options.

[9]:
options = gateway.cluster_options()
options

You can either manually adjust these options in widget, or set it programatically.

[10]:
options.worker_cores = 4

The “Image” is a URL to a Docker image with the environment that will be loaded on the Dask scheduler and workers. This determines the versions of libraries that are available). By default, this matches the image you’re currently working in.

[11]:
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(1)
cluster

Notice that this cluster has 4 cores per worker, as we requested in the options.