Class Scheduler
Dynamic distributed task scheduler
Declaration
source linkDocumentation
The scheduler tracks the current state of workers, data, and computations.
The scheduler listens for events and responds by controlling workers
appropriately. It continuously tries to use the workers to execute an ever
growing dask graph.
All events are handled quickly, in linear time with respect to their input
(which is often of constant size) and generally within a millisecond. To
accomplish this the scheduler tracks a lot of state. Every operation
maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects.
It maintains a consistent and valid view of the world even when listening
to several clients at once.
A Scheduler is typically started either with the ``dask-scheduler``
executable::
$ dask-scheduler
Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection
information::
>>> c = Client() # doctest: +SKIP
>>> c.cluster.scheduler # doctest: +SKIP
Scheduler(...)
Users typically do not interact with the scheduler directly but rather with
the client object ``Client``.
**State**
The scheduler contains the following state variables. Each variable is
listed along with what it stores and a brief description.
* **tasks:** ``{task key: TaskState}``
Tasks currently known to the scheduler
* **unrunnable:** ``{TaskState}``
Tasks in the "no-worker" state
* **workers:** ``{worker key: WorkerState}``
Workers currently connected to the scheduler
* **idle:** ``{WorkerState}``:
Set of workers that are not fully utilized
* **saturated:** ``{WorkerState}``:
Set of workers that are not over-utilized
* **host_info:** ``{hostname: dict}``:
Information about each worker host
* **clients:** ``{client key: ClientState}``
Clients currently connected to the scheduler
* **services:** ``{str: port}``:
Other services running on this scheduler, like Bokeh
* **loop:** ``IOLoop``:
The running Tornado IOLoop
* **client_comms:** ``{client key: Comm}``
For each client, a Comm object used to receive task requests and
report task status updates.
* **stream_comms:** ``{worker key: Comm}``
For each worker, a Comm object from which we both accept stimuli and
report results
* **task_duration:** ``{key-prefix: time}``
Time we expect certain functions to take, e.g. ``{'sum': 0.25}``
Methods
▶ def __init__(self, loop=None, delete_interval="500ms", synchronize_worker_interval="60s", ...) override def __init__(
self,
loop=None,
delete_interval="500ms",
synchronize_worker_interval="60s",
services=None,
service_kwargs=None,
allowed_failures=None,
extensions=None,
validate=None,
scheduler_file=None,
security=None,
worker_ttl=None,
idle_timeout=None,
interface=None,
host=None,
port=0,
protocol=None,
dashboard_address=None,
dashboard=None,
http_prefix="/",
preload=None,
preload_argv=(),
plugins=(),
**kwargs,
)
This method overrides distributed.core.Server.__init__.
▶ def adaptive_target(self, comm=None, target_duration=None) Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a
number of desired workers. This is often used by adaptive scheduling.
Parameters
See also
distributed.deploy.Adaptive
▶ def add_keys(self, comm=None, worker=None, keys=()) Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacy
reasons. However, it is sent by workers from time to time.
▶ def add_plugin(self, plugin=None, idempotent=False, **kwargs) Add external plugin to scheduler
See https://distributed.readthedocs.io/en/latest/plugins.html
▷ def add_resources(self, comm=None, worker=None, resources=None) ▷ def cancel_key(self, key, client, retries=5, force=False) Cancel a particular key and all dependents
The scheduler keeps track of workers that are ..
- Saturated: have enough work to stay busy
- Idle: do not have enough work to stay busy
They are considered saturated if they both have enough tasks to occupy
all of their threads, and if the expected runtime of those tasks is
large enough.
This is useful for load balancing and adaptivity.
▶ def coerce_address(self, addr, resolve=True) Coerce possible input addresses to canonical form. *resolve* can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
▷ def decide_worker(self, ts) Decide on a worker for task *ts*. Return a WorkerState.
▷ def get_comm_cost(self, ts, ws) Get the estimated communication cost (in s.) to compute the task on the given worker.
▷ def get_metadata(self, comm=None, keys=None, default=no_default) ▷ def get_nbytes(self, comm=None, keys=None, summary=True) ▷ def get_task_duration(self, ts, default=None) Get the estimated computation cost of the given task (not including any communication cost).
▶ def get_worker_service_addr(self, worker, service_name, protocol=False) Get the (host, port) address of the named service on the *worker*. Returns None if the service doesn't exist.
Parameters
- worker : address
- service_name : str
Common services include 'bokeh' and 'nanny'
- protocol : boolean
Whether or not to include a full address with protocol (True)
or just a (host, port) pair
▶ def handle_long_running(self, key=None, worker=None, compute_duration=None) A task has seceded from the thread pool
We stop the task from being stolen in the future, and change task
duration accounting as if the task has stopped.
▶ def heartbeat_worker(self, comm=None, address=None, resolve_address=True, now=None, ...) def heartbeat_worker(
self,
comm=None,
address=None,
resolve_address=True,
now=None,
resources=None,
host_info=None,
metrics=None,
)
▶ def identity(self, comm=None) override Basic information about ourselves and our cluster
▷ def new_task(self, key, spec, state) Create a new task, and associated states
The expected duration of a task can change over time. Unfortunately we
don't have a good constant-time way to propagate the effects of these
changes out to the summaries that they affect, like the total expected
runtime of each of the workers, or what tasks are stealable.
In this coroutine we walk through all of the workers and re-align their
estimates with the current state of tasks. We do this periodically
rather than at every transition, and we only do it if the scheduler
process isn't under load (using psutil.Process.cpu_percent()). This
lets us avoid this fringe optimization when we have better things to
think about.
▷ def remove_client(self, client=None) Remove client from network
▷ def remove_plugin(self, plugin) Remove external plugin from scheduler
▶ def report(self, msg, ts=None, client=None) Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to those
comms that care about the key.
▶ def reschedule(self, key=None, worker=None) Reschedule a task
Things may have shifted and this task may now be better suited to run
elsewhere
▶ def run_function(self, stream, function, args=(), kwargs={}, wait=True) Run a function within this process
See also
Client.run_on_scheduler:
Returns Jupyter connection info dictionary.
▷ def stimulus_cancel(self, comm, keys=None, client=None, force=False) Stop execution on a list of keys
▷ def stimulus_missing_data(self, cause=None, key=None, worker=None, ensure=True, **kwargs) Mark that certain keys have gone missing. Recover.
▷ def stimulus_task_erred(self, key=None, worker=None, exception=None, traceback=None, **kwargs) Mark that a task has erred on a particular worker
▷ def stimulus_task_finished(self, key=None, worker=None, **kwargs) Mark that a task has finished execution on a particular worker
▷ def story(self, *keys) Get all transitions that touch one of the input keys
▶ def transition(self, key, finish, *args, **kwargs) Transition a key from its current state to the finish state
Returns
- Dictionary of recommendations for future transitions
Examples
>>> self.transition('x', 'waiting')
{'x': 'processing'}
See also
Scheduler.transitions: transitive version of this function
def transition_processing_memory(
self,
key,
nbytes=None,
type=None,
typename=None,
worker=None,
startstops=None,
**kwargs,
)
▶ def transitions(self, recommendations) Process transitions until none are left
This includes feedback from previous transitions and continues until we
reach a steady state
▶ def update_data(self, comm=None, who_has=None, nbytes=None, client=None, serializers=None) Learn that new data has entered the network from an external source
See also
Scheduler.mark_key_in_memory
▶ def update_graph(self, client=None, tasks=None, keys=None, dependencies=None, ...) Add new computations to the internal dask graph
def update_graph(
self,
client=None,
tasks=None,
keys=None,
dependencies=None,
restrictions=None,
priority=None,
loose_restrictions=None,
resources=None,
submitting_task=None,
retries=None,
user_priority=0,
actors=None,
fifo_timeout=0,
)
This happens whenever the Client calls submit, map, get, or compute.
▶ def update_graph_hlg(self, client=None, hlg=None, keys=None, dependencies=None, restrictions=None, ...) def update_graph_hlg(
self,
client=None,
hlg=None,
keys=None,
dependencies=None,
restrictions=None,
priority=None,
loose_restrictions=None,
resources=None,
submitting_task=None,
retries=None,
user_priority=0,
actors=None,
fifo_timeout=0,
)
▶ def valid_workers(self, ts) Return set of currently valid workers for key
If all workers are valid then this returns ``True``.
This checks tracks the following state:
* worker_restrictions
* host_restrictions
* resource_restrictions
▶ def worker_objective(self, ts, ws) Objective function to determine which worker should get the task
Minimize expected start time. If a tie then break with data storage.
▶ def worker_send(self, worker, msg) Send message to worker
This also handles connection failures by adding a callback to remove
the worker on the next cycle.
▶ def workers_list(self, workers) List of qualifying workers
Takes a list of worker addresses or hostnames.
Returns a list of all worker addresses that match
▶ def workers_to_close(self, comm=None, memory_ratio=None, n=None, key=None, minimum=None, ...) Find workers that we can close with low cost
def workers_to_close(
self,
comm=None,
memory_ratio=None,
n=None,
key=None,
minimum=None,
target=None,
attribute="address",
)
This returns a list of workers that are good candidates to retire.
These workers are not running anything and are storing
relatively little data relative to their peers. If all workers are
idle then we still maintain enough workers to have enough RAM to store
our data, with a comfortable buffer.
This is for use with systems like ``distributed.deploy.adaptive``.
Parameters
- memory_factor : Number
Amount of extra space we want to have for our stored data.
Defaults two 2, or that we want to have twice as much memory as we
currently have data.
- n : int
Number of workers to close
- minimum : int
Minimum number of workers to keep around
- key : Callable(WorkerState)
An optional callable mapping a WorkerState object to a group
affiliation. Groups will be closed together. This is useful when
closing workers must be done collectively, such as by hostname.
- target : int
Target number of workers to have after we close
- attribute : str
The attribute of the WorkerState object to return, like "address"
or "name". Defaults to "address".
Returns
- list of worker addresses that are OK to close
Examples
>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
Group workers by hostname prior to closing
>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
Remove two workers
>>> scheduler.workers_to_close(n=2)
Keep enough workers to have twice as much memory as we we need.
>>> scheduler.workers_to_close(memory_ratio=2)
See also
Scheduler.retire_workers