Class Worker

Worker node in a Dask distributed cluster

Declaration

class Worker(ServerNode)
source link

Documentation

Workers perform two functions:

1.  **Serve data** from a local dictionary
2.  **Perform computation** on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to
gather data from other workers when necessary to perform a computation.

You can start a worker with the ``dask-worker`` command line application::

    $ dask-worker scheduler-ip:port

Use the ``--help`` flag to see more options::

    $ dask-worker --help

The rest of this docstring is about the internal state the the worker uses
to manage and track internal computations.

**State**

**Informational State**

These attributes don't change significantly during execution.

* **nthreads:** ``int``:
    Number of nthreads used by this worker process
* **executor:** ``concurrent.futures.ThreadPoolExecutor``:
    Executor used to perform computation
* **local_directory:** ``path``:
    Path on local machine to store temporary files
* **scheduler:** ``rpc``:
    Location of scheduler.  See ``.ip/.port`` attributes.
* **name:** ``string``:
    Alias
* **services:** ``{str: Server}``:
    Auxiliary web servers running on this worker
* **service_ports:** ``{str: port}``:
* **total_out_connections**: ``int``
    The maximum number of concurrent outgoing requests for data
* **total_in_connections**: ``int``
    The maximum number of concurrent incoming requests for data
* **total_comm_nbytes**: ``int``
* **batched_stream**: ``BatchedSend``
    A batched stream along which we communicate to the scheduler
* **log**: ``[(message)]``
    A structured and queryable log.  See ``Worker.story``

**Volatile State**

These attributes track the progress of tasks that this worker is trying to
complete.  In the descriptions below a ``key`` is the name of a task that
we want to compute and ``dep`` is the name of a piece of dependent data
that we want to collect from others.

* **tasks**: ``{key: TaskState}``
    The tasks currently executing on this worker (and any dependencies of those tasks)
* **data:** ``{key: object}``:
    Prefer using the **host** attribute instead of this, unless
    memory_limit and at least one of memory_target_fraction or
    memory_spill_fraction values are defined, in that case, this attribute
    is a zict.Buffer, from which information on LRU cache can be queried.
* **data.memory:** ``{key: object}``:
    Dictionary mapping keys to actual values stored in memory. Only
    available if condition for **data** being a zict.Buffer is met.
* **data.disk:** ``{key: object}``:
    Dictionary mapping keys to actual values stored on disk. Only
    available if condition for **data** being a zict.Buffer is met.
* **data_needed**: deque(keys)
    The keys whose data we still lack, arranged in a deque
* **ready**: [keys]
    Keys that are ready to run.  Stored in a LIFO stack
* **constrained**: [keys]
    Keys for which we have the data to run, but are waiting on abstract
    resources like GPUs.  Stored in a FIFO deque
* **executing_count**: ``int``
    A count of tasks currently executing on this worker
* **executed_count**: int
    A number of tasks that this worker has run in its lifetime
* **long_running**: {keys}
    A set of keys of tasks that are running and have started their own
    long-running clients.
* **has_what**: ``{worker: {deps}}``
    The data that we care about that we think a worker has
* **pending_data_per_worker**: ``{worker: [dep]}``
    The data on each worker that we still want, prioritized as a deque
* **in_flight_tasks**: ``int``
    A count of the number of tasks that are coming to us in current
    peer-to-peer connections
* **in_flight_workers**: ``{worker: {task}}``
    The workers from which we are currently gathering data and the
    dependencies we expect from those connections
* **comm_bytes**: ``int``
    The total number of bytes in flight
* **nbytes**: ``{key: int}``
    The size of a particular piece of data
* **threads**: ``{key: int}``
    The ID of the thread on which the task ran
* **active_threads**: ``{int: key}``
    The keys currently running on active threads
* **waiting_for_data_count**: ``int``
    A count of how many tasks are currently waiting for data

Attributes

Examples

Use the command line to start a worker::

    $ dask-scheduler
    Start scheduler at 127.0.0.1:8786

    $ dask-worker 127.0.0.1:8786
    Start worker at:               127.0.0.1:1234
    Registered with scheduler at:  127.0.0.1:8786

See also

distributed.scheduler.Scheduler
distributed.nanny.Nanny

Methods

Inherited methods

Subclasses

Reexports