The SpecCluster class expects a full specification of the Scheduler and Workers to use. It removes any handling of user inputs (like threads vs processes, number of cores, and so on) and any handling of cluster resource managers (like pods, jobs, and so on). Instead, it expects this information to be passed in scheduler and worker specifications. This class does handle all of the logic around asynchronously cleanly setting up and tearing things down at the right times. Hopefully it can form a base for other more user-centric classes.
A dictionary mapping names to worker classes and their specifications See example below
A similar mapping for a scheduler
A specification of a single worker. This is used for any new workers that are created.
If this is intended to be used directly within an event loop with async/await
Whether or not we should silence logging when setting up the cluster.
A name to use when printing out the cluster, defaults to type name
To create a SpecCluster you specify how to set up a Scheduler and Workers >>> from dask.distributed import Scheduler, Worker, Nanny >>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}} >>> workers = { ... 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, ... 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, ... } >>> cluster = SpecCluster(scheduler=scheduler, workers=workers) The worker spec is stored as the ``.worker_spec`` attribute >>> cluster.worker_spec { 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, } While the instantiation of this spec is stored in the ``.workers`` attribute >>> cluster.workers { 'my-worker': <Worker ...> 'my-nanny': <Nanny ...> } Should the spec change, we can await the cluster or call the ``._correct_state`` method to align the actual state to the specified state. We can also ``.scale(...)`` the cluster, which adds new workers of a given form. >>> worker = {'cls': Worker, 'options': {}} >>> cluster = SpecCluster(scheduler=scheduler, worker=worker) >>> cluster.worker_spec {} >>> cluster.scale(3) >>> cluster.worker_spec { 0: {'cls': Worker, 'options': {}}, 1: {'cls': Worker, 'options': {}}, 2: {'cls': Worker, 'options': {}}, } Note that above we are using the standard ``Worker`` and ``Nanny`` classes, however in practice other classes could be used that handle resource management like ``KubernetesPod`` or ``SLURMJob``. The spec does not need to conform to the expectations of the standard Dask Worker class. It just needs to be called with the provided options, support ``__await__`` and ``close`` methods and the ``worker_address`` property.. Also note that uniformity of the specification is not required. Other API could be added externally (in subclasses) that adds workers of different specifications into the same dictionary. If a single entry in the spec will generate multiple dask workers then please provide a `"group"` element to the spec, that includes the suffixes that will be added to each name (this should be handled by your worker class). >>> cluster.worker_spec { 0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]} 1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]} } These suffixes should correspond to the names used by the workers when they deploy. >>> [ws.name for ws in cluster.scheduler.workers.values()] ["0-0", "0-1", "0-2", "1-0", "1-1"]
This method overrides distributed.deploy.cluster.Cluster.__init__.
This method is overriden in:
This method overrides distributed.deploy.cluster.Cluster.adapt.
This scales Dask clusters automatically based on scheduler activity.
Minimum number of workers
Maximum number of workers
Minimum number of cores/threads to keep around in the cluster
Maximum number of cores/threads to keep around in the cluster
Minimum amount of memory to keep around in the cluster Expressed as a string like "100 GiB"
Maximum amount of memory to keep around in the cluster Expressed as a string like "100 GiB"
>>> cluster.adapt(minimum=0, maximum_memory="100 GiB", interval='500ms')
dask.distributed.Adaptive : for more keyword arguments