Class SpecCluster

Cluster that requires a full specification of workers

Declaration

class SpecCluster(Cluster)
source link

Documentation

The SpecCluster class expects a full specification of the Scheduler and
Workers to use.  It removes any handling of user inputs (like threads vs
processes, number of cores, and so on) and any handling of cluster resource
managers (like pods, jobs, and so on).  Instead, it expects this
information to be passed in scheduler and worker specifications.  This
class does handle all of the logic around asynchronously cleanly setting up
and tearing things down at the right times.  Hopefully it can form a base
for other more user-centric classes.

Attributes

Examples

To create a SpecCluster you specify how to set up a Scheduler and Workers

>>> from dask.distributed import Scheduler, Worker, Nanny
>>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
>>> workers = {
...     'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
...     'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
... }
>>> cluster = SpecCluster(scheduler=scheduler, workers=workers)

The worker spec is stored as the ``.worker_spec`` attribute

>>> cluster.worker_spec
{
   'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
   'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
}

While the instantiation of this spec is stored in the ``.workers``
attribute

>>> cluster.workers
{
    'my-worker': <Worker ...>
    'my-nanny': <Nanny ...>
}

Should the spec change, we can await the cluster or call the
``._correct_state`` method to align the actual state to the specified
state.

We can also ``.scale(...)`` the cluster, which adds new workers of a given
form.

>>> worker = {'cls': Worker, 'options': {}}
>>> cluster = SpecCluster(scheduler=scheduler, worker=worker)
>>> cluster.worker_spec
{}

>>> cluster.scale(3)
>>> cluster.worker_spec
{
    0: {'cls': Worker, 'options': {}},
    1: {'cls': Worker, 'options': {}},
    2: {'cls': Worker, 'options': {}},
}

Note that above we are using the standard ``Worker`` and ``Nanny`` classes,
however in practice other classes could be used that handle resource
management like ``KubernetesPod`` or ``SLURMJob``.  The spec does not need
to conform to the expectations of the standard Dask Worker class.  It just
needs to be called with the provided options, support ``__await__`` and
``close`` methods and the ``worker_address`` property..

Also note that uniformity of the specification is not required.  Other API
could be added externally (in subclasses) that adds workers of different
specifications into the same dictionary.

If a single entry in the spec will generate multiple dask workers then
please provide a `"group"` element to the spec, that includes the suffixes
that will be added to each name (this should be handled by your worker
class).

>>> cluster.worker_spec
{
    0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]}
    1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]}
}

These suffixes should correspond to the names used by the workers when
they deploy.

>>> [ws.name for ws in cluster.scheduler.workers.values()]
["0-0", "0-1", "0-2", "1-0", "1-1"]

Methods

Inherited methods

Subclasses

Reexports