The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in ``concurrent.futures`` but also allows ``Future`` objects within ``submit/map`` calls. When a Client is instantiated it takes over all ``dask.compute`` and ``dask.persist`` calls by default. It is also common to create a Client without specifying the scheduler address , like ``Client()``. In this case the Client creates a :class:`LocalCluster` in the background and connects to that. Any extra keywords are passed from Client to LocalCluster in this case. See the LocalCluster documentation for more information.
This can be the address of a ``Scheduler`` server like a string ``'127.0.0.1:8786'`` or a cluster object like ``LocalCluster()``
Timeout duration for initial connection to the scheduler
Claim this scheduler as the global dask scheduler
Path to a file with scheduler information if available
Optional security information. If creating a local cluster can also pass in ``True``, in which case temporary self-signed credentials will be created automatically.
Set to True if using this client within async/await functions or within Tornado gen.coroutines. Otherwise this should remain False for normal use.
Gives the client a name that will be included in logs generated on the scheduler for matters relating to this client
Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary.
Time in milliseconds between heartbeats to scheduler
If you do not pass a scheduler address, Client will create a ``LocalCluster`` object, passing any extra keyword arguments.
Provide cluster's scheduler node address on initialization: >>> client = Client('127.0.0.1:8786') # doctest: +SKIP Use ``submit`` method to send individual computations to the cluster >>> a = client.submit(add, 1, 2) # doctest: +SKIP >>> b = client.submit(add, 10, 20) # doctest: +SKIP Continue using submit or map on results to build up larger computations >>> c = client.submit(add, a, b) # doctest: +SKIP Gather results with the ``gather`` method. >>> client.gather(c) # doctest: +SKIP 33 You can also call Client with no arguments in order to create your own local cluster. >>> client = Client() # makes your own local "cluster" # doctest: +SKIP Extra keywords will be passed directly to LocalCluster >>> client = Client(processes=False, threads_per_worker=1) # doctest: +SKIP
distributed.scheduler.Scheduler: Internal scheduler distributed.LocalCluster:
This method is overriden in:
This is true if the user signaled that we might be when creating the client as in the following:: client = Client(asynchronous=True) However, we override this expectation if we can definitively tell that we are running from a thread that is not the event loop. This is common when calling get_client() from within a worker task. Even though the client was originally created in asynchronous mode we may find ourselves in contexts when it is better to operate synchronously.
You can specify data of interest either by providing futures or collections in the ``futures=`` keyword or a list of explicit keys in the ``keys=`` keyword. If neither are provided then all call stacks will be returned.
List of futures, defaults to all data
List of key names, defaults to all data
>>> df = dd.read_parquet(...).persist() # doctest: +SKIP >>> client.call_stack(df) # call on collections >>> client.call_stack() # Or call with no arguments for all activity # doctest: +SKIP
This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible
Cancel this future even if other clients desire it
Collections like dask.array or dataframe or dask.value objects
Returns Futures if False (default) or concrete values if True
Whether or not to optimize the underlying graphs
Which workers can run which parts of the computation If a string or list then the output collections will run on the listed workers, but other sub-computations can run anywhere If a dict then keys should be (tuples of) collections or task keys and values should be addresses or lists.
If True then all restrictions in workers= are considered loose If a list then only the keys for the listed collections are loose
Number of allowed automatic retries if computing a result fails
Optional prioritization of task. Zero is default. Higher priorities take precedence
Allowed amount of time between calls to consider the same priority
By default dask traverses builtin python collections looking for dask objects passed to ``compute``. For large collections this can be expensive. If none of the arguments contain any dask objects, set ``traverse=False`` to avoid doing this traversal.
Defines the `resources` these tasks require on the worker. Can specify global resources (``{'GPU': 2}``), or per-task resources (``{'x': {'GPU': 1}, 'y': {'SSD': 4}}``), but not both. See :doc:`worker resources <resources>` for details on defining resources.
Whether these tasks should exist on the worker as stateful actors. Specified on a global (True/False) or per-task (``{'x': True, 'y': False}``) basis. See :doc:`actors` for additional details.
Options to pass to the graph optimize calls
>>> from dask import delayed >>> from operator import add >>> x = delayed(add)(1, 2) >>> y = delayed(add)(x, x) >>> xx, yy = client.compute([x, y]) # doctest: +SKIP >>> xx # doctest: +SKIP <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> >>> xx.result() # doctest: +SKIP 3 >>> yy.result() # doctest: +SKIP 6 Also support single arguments >>> xx = client.compute(x) # doctest: +SKIP
Client.get: Normal synchronous dask.get function
Accepts a future, nested container of futures, iterator, or queue. The return type will match the input type.
This can be a possibly nested collection of Future objects. Collections can be lists, sets, or dictionaries
Either 'raise' or 'skip' if we should raise if a future has erred or skip its inclusion in the output collection
Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary. This can also be set when creating the Client.
>>> from operator import add # doctest: +SKIP >>> c = Client('127.0.0.1:8787') # doctest: +SKIP >>> x = c.submit(add, 1, 2) # doctest: +SKIP >>> c.gather(x) # doctest: +SKIP 3 >>> c.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP [3, [3], 3]
Client.scatter: Send data out to cluster
A mapping of {key: {set of worker hostnames}} that restricts where jobs can take place
Number of allowed automatic retries if computing a result fails
Optional prioritization of task. Zero is default. Higher priorities take precedence
Returns Futures if False or concrete values if True (default).
Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary. This can also be set when creating the Client.
>>> from operator import add # doctest: +SKIP >>> c = Client('127.0.0.1:8787') # doctest: +SKIP >>> c.get({'x': (add, 1, 2)}, 'x') # doctest: +SKIP 3
Client.compute: Compute asynchronous collections
See set_metadata for the full docstring with examples
Key to access. If a list then gets within a nested collection
If the key does not exist then return this value instead. If not provided then this raises a KeyError if the key is not present
This collects the data present in the diagnostic "Task Stream" plot on the dashboard. It includes the start, stop, transfer, and deserialization time of every task for a particular duration. Note that the task stream diagnostic does not run by default. You may wish to call this function once before you start work to ensure that things start recording, and then again after you have completed.
When you want to start recording If a number it should be the result of calling time() If a string then it should be a time difference before now, like '60s' or '500 ms'
When you want to stop recording
The number of desired records, ignored if both start and stop are specified
If true then also return a Bokeh figure If plot == 'save' then save the figure to a file
The filename to save to if you set ``plot='save'``
Specifies if the resource component is INLINE or CDN
>>> client.get_task_stream() # prime plugin if not already connected >>> x.compute() # do some work >>> client.get_task_stream() [{'task': ..., 'type': ..., 'thread': ..., ...}] Pass the ``plot=True`` or ``plot='save'`` keywords to get back a Bokeh figure >>> data, figure = client.get_task_stream(plot='save', filename='myfile.html') Alternatively consider the context manager >>> from dask.distributed import get_task_stream >>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
get_task_stream: a context manager version of this method
Number of logs to retrive. Maxes out at 10000 by default, confiruable in config.yaml::log-length
List of worker addresses to retrieve. Gets all workers by default.
Whether to get the logs from the workers (False) or the nannies (True). If specified, the addresses in `workers` should still be the worker addresses, not the nanny addresses.
This returns the keys of the data that are held in each worker's memory.
A list of worker addresses, defaults to all
>>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP >>> wait([x, y, z]) # doctest: +SKIP >>> c.has_what() # doctest: +SKIP {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
Client.who_has Client.nthreads Client.processing
Arguments can be normal objects or Futures
List-like objects to map over. They should have the same length.
Prefix for task names if string. Explicit names if list.
Whether or not the function is pure. Set ``pure=False`` for impure functions like ``np.random.random``.
A set of worker hostnames on which computations may be performed. Leave empty to default to all workers (common case)
Used with `workers`. Indicates whether or not the computations may be performed on workers that are not in the `workers` set(s).
Number of allowed automatic retries if a task fails
Optional prioritization of task. Zero is default. Higher priorities take precedence
Allowed amount of time between calls to consider the same priority
Defines the `resources` each instance of this mapped task requires on the worker; e.g. ``{'GPU': 2}``. See :doc:`worker resources <resources>` for details on defining resources.
Whether these tasks should exist on the worker as stateful actors. See :doc:`actors` for additional details.
Alias for `actor`
Submit tasks to the scheduler in batches of (at most) ``batch_size``. Larger batch sizes can be useful for very large ``iterables``, as the cluster can start processing tasks while later ones are submitted asynchronously.
Extra keywords to send to the function. Large values will be included explicitly in the task graph.
>>> L = client.map(func, sequence) # doctest: +SKIP
This is as measured by ``sys.getsizeof`` which may not accurately reflect the true cost.
A list of keys, defaults to all keys
Summarize keys into key types
>>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP >>> c.nbytes(summary=False) # doctest: +SKIP {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28} >>> c.nbytes(summary=True) # doctest: +SKIP {'inc': 84}
Client.who_has
This normalizes the tasks within a collections task graph against the known futures within the scheduler. It returns a copy of the collection with a task graph that includes the overlapping futures.
>>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks # doctest: +SKIP 100 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists # doctest: +SKIP 10 >>> x = client.normalize_collection(x) # doctest: +SKIP >>> len(x.__dask_graph__()) # smaller computational graph # doctest: +SKIP 20
Client.persist: trigger computation of collection's tasks
A list of workers that we care about specifically. Leave empty to receive information about all workers.
>>> c.threads() # doctest: +SKIP {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
Client.who_has Client.has_what
Starts computation of the collection on the cluster in the background. Provides a new dask collection that is semantically identical to the previous one, but now based off of futures currently in execution.
Collections like dask.array or dataframe or dask.value objects
Whether or not to optimize the underlying graphs
Which workers can run which parts of the computation If a string or list then the output collections will run on the listed workers, but other sub-computations can run anywhere If a dict then keys should be (tuples of) collections or task keys and values should be addresses or lists.
If True then all restrictions in workers= are considered loose If a list then only the keys for the listed collections are loose
Number of allowed automatic retries if computing a result fails
Optional prioritization of task. Zero is default. Higher priorities take precedence
Allowed amount of time between calls to consider the same priority
Defines the `resources` these tasks require on the worker. Can specify global resources (``{'GPU': 2}``), or per-task resources (``{'x': {'GPU': 1}, 'y': {'SSD': 4}}``), but not both. See :doc:`worker resources <resources>` for details on defining resources.
Whether these tasks should exist on the worker as stateful actors. Specified on a global (True/False) or per-task (``{'x': True, 'y': False}``) basis. See :doc:`actors` for additional details.
Options to pass to the graph optimize calls
>>> xx = client.persist(x) # doctest: +SKIP >>> xx, yy = client.persist([x, y]) # doctest: +SKIP
Client.compute
A list of worker addresses, defaults to all
>>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP >>> c.processing() # doctest: +SKIP {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
Client.who_has Client.has_what Client.nthreads
Key prefix to select, this is typically a function name like 'inc' Leave as None to collect all data
List of workers to restrict profile information
If true, return the profile of the worker's administrative thread rather than the worker threads. This is useful when profiling Dask itself, rather than user code.
If true, return the profile information from the scheduler's administrative thread rather than the workers. This is useful when profiling Dask's scheduling itself.
Whether or not to return a plot object
Filename to save the plot
>>> client.profile() # call on collections >>> client.profile(filename='dask-profile.html') # save to html file
This stores a named reference to a dask collection or list of futures on the scheduler. These references are available to other Clients which can download the collection or futures with ``get_dataset``. Datasets are not immediately computed. You may wish to call ``Client.persist`` prior to publishing a dataset.
if true, override any already present dataset with the same name
named collections to publish on the scheduler
Publishing client: >>> df = dd.read_csv('s3://...') # doctest: +SKIP >>> df = c.persist(df) # doctest: +SKIP >>> c.publish_dataset(my_dataset=df) # doctest: +SKIP Alternative invocation >>> c.publish_dataset(df, name='my_dataset') Receiving client: >>> c.list_datasets() # doctest: +SKIP ['my_dataset'] >>> df2 = c.get_dataset('my_dataset') # doctest: +SKIP
Client.list_datasets Client.get_dataset Client.unpublish_dataset Client.persist
Move data between workers to roughly balance memory burden. This either affects a subset of the keys/workers or the entire network, depending on keyword arguments. This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.
A list of futures to balance, defaults all data
A list of workers on which to balance, defaults to all workers
This registers a new setup function for workers in this cluster. The function will run immediately on all currently connected workers. It will also be run upon connection by any workers that are added in the future. Multiple setup functions can be registered - these will be called in the order they were added. If the function takes an input argument named ``dask_worker`` then that variable will be populated with the worker itself.
Function to register and run on all workers
This registers a new object to handle setup, task state transitions and teardown for workers in this cluster. The plugin will instantiate itself on all currently connected workers. It will also be run on any worker that connects in the future. The plugin may include methods ``setup``, ``teardown``, ``transition``, ``release_key``, and ``release_dep``. See the ``dask.distributed.WorkerPlugin`` class or the examples below for the interface and docstrings. It must be serializable with the pickle or cloudpickle modules. If the plugin has a ``name`` attribute, or if the ``name=`` keyword is used then that will control idempotency. If a plugin with that name has already been registered then any future plugins will not run. For alternatives to plugins, you may also wish to look into preload scripts.
The plugin object to pass to the workers
A name for the plugin. Registering a plugin with the same name will have no effect.
If you pass a class as the plugin, instead of a class instance, then the class will be instantiated with any extra keyword arguments.
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: Optional[str], reason: None, report: bool): ... pass ... def release_dep(self, dep: str, state: str, report: bool): ... pass >>> plugin = MyPlugin(1, 2, 3) >>> client.register_worker_plugin(plugin) You can get access to the plugin with the ``get_worker`` function >>> client.register_worker_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state >>> future = client.run(f)
distributed.WorkerPlugin
Copy data onto many workers. This helps to broadcast frequently accessed data and it helps to improve resilience. This performs a tree copy of the data throughout the network individually on each piece of data. This operation blocks until complete. It does not guarantee replication of data to future workers.
Futures we wish to replicate
Number of processes on the cluster on which to replicate the data. Defaults to all.
Workers on which we want to restrict the replication. Defaults to all.
The number of workers that can copy data in each generation
>>> x = c.submit(func, *args) # doctest: +SKIP >>> c.replicate([x]) # send to all workers # doctest: +SKIP >>> c.replicate([x], n=3) # send to three workers # doctest: +SKIP >>> c.replicate([x], workers=['alice', 'bob']) # send to specific # doctest: +SKIP >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers # doctest: +SKIP >>> c.replicate([x], n=1) # reduce replications # doctest: +SKIP See also -------- Client.rebalance
See dask.distributed.Scheduler.retire_workers for the full docstring.
You can get information about active workers using the following: >>> workers = client.scheduler_info()['workers'] From that list you may want to select some workers to close >>> client.retire_workers(workers=['tcp://address:port', ...])
dask.distributed.Scheduler.retire_workers
This calls a function on all currently known workers immediately, blocks until those results come back, and returns the results asynchronously as a dictionary keyed by worker address. This method if generally used for side effects, such and collecting diagnostic information or installing libraries. If your function takes an input argument named ``dask_worker`` then that variable will be populated with the worker itself.
Workers on which to run the function. Defaults to all known workers.
If the function is asynchronous whether or not to wait until that function finishes.
Whether to run ``function`` on the nanny. By default, the function is run on the worker process. If specified, the addresses in ``workers`` should still be the worker addresses, not the nanny addresses.
>>> c.run(os.getpid) # doctest: +SKIP {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321, '192.168.0.102:9000': 5555} Restrict computation to particular workers with the ``workers=`` keyword argument. >>> c.run(os.getpid, workers=['192.168.0.100:9000', ... '192.168.0.101:9000']) # doctest: +SKIP {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321} >>> def get_status(dask_worker): ... return dask_worker.status >>> c.run(get_hostname) # doctest: +SKIP {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running} Run asynchronous functions in the background: >>> async def print_state(dask_worker): # doctest: +SKIP ... while True: ... print(dask_worker.status) ... await asyncio.sleep(1) >>> c.run(print_state, wait=False) # doctest: +SKIP
This spawns a coroutine on all currently known workers and then waits for the coroutine on each worker. The coroutines' results are returned as a dictionary keyed by worker address.
(typically a function wrapped in gen.coroutine or a Python 3.5+ async function)
Whether to wait for coroutines to end.
Workers on which to run the function. Defaults to all known workers.
This is typically used for live debugging. The function should take a keyword argument ``dask_scheduler=``, which will be given the scheduler object itself.
>>> def get_number_of_tasks(dask_scheduler=None): ... return len(dask_scheduler.tasks) >>> client.run_on_scheduler(get_number_of_tasks) # doctest: +SKIP 100 Run asynchronous functions in the background: >>> async def print_state(dask_scheduler): # doctest: +SKIP ... while True: ... print(dask_scheduler.status) ... await asyncio.sleep(1) >>> c.run(print_state, wait=False) # doctest: +SKIP
Client.run: Run a function on all workers Client.start_ipython_scheduler: Start an IPython session on scheduler
This moves data from the local client process into the workers of the distributed scheduler. Note that it is often better to submit jobs to your workers to have them load the data rather than loading data locally and then scattering it out to them.
Data to scatter out to workers. Output type matches input type.
Optionally constrain locations of data. Specify workers as hostname/port pairs, e.g. ``('127.0.0.1', 8787)``.
Whether to send each data element to all workers. By default we round-robin based on number of cores.
Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary. This can also be set when creating the Client.
Whether or not to hash data to determine key. If False then this uses a random key
>>> c = Client('127.0.0.1:8787') # doctest: +SKIP >>> c.scatter(1) # doctest: +SKIP <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195> >>> c.scatter([1, 2, 3]) # doctest: +SKIP [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>] >>> c.scatter({'x': 1, 'y': 2, 'z': 3}) # doctest: +SKIP {'x': <Future: status: finished, key: x>, 'y': <Future: status: finished, key: y>, 'z': <Future: status: finished, key: z>} Constrain location of data to subset of workers >>> c.scatter([1, 2, 3], workers=[('hostname', 8788)]) # doctest: +SKIP Broadcast data to all workers >>> [future] = c.scatter([element], broadcast=True) # doctest: +SKIP Send scattered data to parallelized function using client futures interface >>> data = c.scatter(data, broadcast=True) # doctest: +SKIP >>> res = [c.submit(func, data, i) for i in range(100)]
Client.gather: Gather data back to local process
>>> c.scheduler_info() # doctest: +SKIP {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 'services': {}, 'type': 'Scheduler', 'workers': {'127.0.0.1:40575': {'active': 0, 'last-seen': 1472038237.4845693, 'name': '127.0.0.1:40575', 'services': {}, 'stored': 0, 'time-delay': 0.0061032772064208984}}}
This allows you to store small amounts of data on the central scheduler process for administrative purposes. Data should be msgpack serializable (ints, strings, lists, dicts) If the key corresponds to a task then that key will be cleaned up when the task is forgotten by the scheduler. If the key is a list then it will be assumed that you want to index into a nested dictionary structure using those keys. For example if you call the following:: >>> client.set_metadata(['a', 'b', 'c'], 123) Then this is the same as setting >>> scheduler.task_metadata['a']['b']['c'] = 123 The lower level dictionaries will be created on demand.
>>> client.set_metadata('x', 123) # doctest: +SKIP >>> client.get_metadata('x') # doctest: +SKIP 123 >>> client.set_metadata(['x', 'y'], 123) # doctest: +SKIP >>> client.get_metadata('x') # doctest: +SKIP {'y': 123} >>> client.set_metadata(['x', 'w', 'z'], 456) # doctest: +SKIP >>> client.get_metadata('x') # doctest: +SKIP {'y': 123, 'w': {'z': 456}} >>> client.get_metadata(['x', 'w']) # doctest: +SKIP {'z': 456}
get_metadata
If defined, register IPython magic with this name for executing code on the scheduler. If not defined, register %scheduler magic if IPython is running.
If True, launch a Jupyter QtConsole connected to the worker(s).
Additional arguments to pass to the qtconsole on startup.
connection_info dict containing info necessary to connect Jupyter clients to the scheduler.
>>> c.start_ipython_scheduler() # doctest: +SKIP >>> %scheduler scheduler.processing # doctest: +SKIP {'127.0.0.1:3595': {'inc-1', 'inc-2'}, '127.0.0.1:53589': {'inc-2', 'add-5'}} >>> c.start_ipython_scheduler(qtconsole=True) # doctest: +SKIP
Client.start_ipython_workers: Start IPython on the workers
A list of worker addresses, defaults to all
If defined, register IPython magics with these names for executing code on the workers. If string has asterix then expand asterix into 0, 1, ..., n for n workers
If True, launch a Jupyter QtConsole connected to the worker(s).
Additional arguments to pass to the qtconsole on startup.
List of connection_info dicts containing info necessary to connect Jupyter clients to the workers.
>>> info = c.start_ipython_workers() # doctest: +SKIP >>> %remote info['192.168.1.101:5752'] worker.data # doctest: +SKIP {'x': 1, 'y': 100} >>> c.start_ipython_workers('192.168.1.101:5752', magic_names='w') # doctest: +SKIP >>> %w worker.data # doctest: +SKIP {'x': 1, 'y': 100} >>> c.start_ipython_workers('192.168.1.101:5752', qtconsole=True) # doctest: +SKIP Add asterix * in magic names to add one magic per worker >>> c.start_ipython_workers(magic_names='w_*') # doctest: +SKIP >>> %w_0 worker.data # doctest: +SKIP {'x': 1, 'y': 100} >>> %w_1 worker.data # doctest: +SKIP {'z': 5}
Client.start_ipython_scheduler: start ipython on the scheduler
Whether or not the function is pure. Set ``pure=False`` for impure functions like ``np.random.random``.
A set of worker hostnames on which computations may be performed. Leave empty to default to all workers (common case)
Unique identifier for the task. Defaults to function-name and hash
Used with `workers`. Indicates whether or not the computations may be performed on workers that are not in the `workers` set(s).
Number of allowed automatic retries if the task fails
Optional prioritization of task. Zero is default. Higher priorities take precedence
Allowed amount of time between calls to consider the same priority
Defines the `resources` this job requires on the worker; e.g. ``{'GPU': 2}``. See :doc:`worker resources <resources>` for details on defining resources.
Whether this task should exist on the worker as a stateful actor. See :doc:`actors` for additional details.
Alias for `actor`
>>> c = client.submit(add, a, b) # doctest: +SKIP
Client.map: Submit on many arguments at once
This sends a local file up to all worker nodes. This file is placed into a temporary directory on Python's system path so any .py, .egg or .zip files will be importable.
Filename of .py, .egg or .zip file to send to workers
>>> client.upload_file('mylibrary.egg') # doctest: +SKIP >>> from mylibrary import myfunc # doctest: +SKIP >>> L = client.map(myfunc, seq) # doctest: +SKIP
A list of futures, defaults to all data
>>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP >>> wait([x, y, z]) # doctest: +SKIP >>> c.who_has() # doctest: +SKIP {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']} >>> c.who_has([x, y]) # doctest: +SKIP {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
Client.has_what Client.nthreads
This facilitates easy sharing of scheduler information using a file system. The scheduler file can be used to instantiate a second Client using the same scheduler.
Path to a write the scheduler file.
>>> client = Client() # doctest: +SKIP >>> client.write_scheduler_file('scheduler.json') # doctest: +SKIP # connect to previous client's scheduler >>> client2 = Client(scheduler_file='scheduler.json') # doctest: +SKIP