Class Semaphore
Semaphore
Declaration
class Semaphore
source linkDocumentation
This `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`_
will track leases on the scheduler which can be acquired and
released by an instance of this class. If the maximum amount of leases are
already acquired, it is not possible to acquire more and the caller waits
until another lease has been released.
The lifetime or leases are controlled using a timeout. This timeout is
refreshed in regular intervals by the ``Client`` of this instance and
provides protection from deadlocks or resource starvation in case of worker
failure.
The timeout can be controlled using the configuration option
``distributed.scheduler.locks.lease-timeout`` and the interval in which the
scheduler verifies the timeout is set using the option
``distributed.scheduler.locks.lease-validation-interval``.
A noticeable difference to the Semaphore of the python standard library is
that this implementation does not allow to release more often than it was
acquired. If this happens, a warning is emitted but the internal state is
not modified.
.. warning::
This implementation is still in an experimental state and subtle
changes in behavior may occur without any change in the major version
of this library.
.. warning::
This implementation is susceptible to lease overbooking in case of
lease timeouts. It is advised to monitor log information and adjust
above configuration options to suitable values for the user application.
Attributes
- max_leases : int
The maximum amount of leases that may be granted at the same time. This
effectively sets an upper limit to the amount of parallel access to a specific resource.
Defaults to 1.
- name : string
Name of the semaphore to acquire. Choosing the same name allows two
disconnected processes to coordinate. If not given, a random
name will be generated.
- client : Client
Client to use for communication with the scheduler. If not given, the
default global client will be used.
- register : bool
If True, register the semaphore with the scheduler. This needs to be
done before any leases can be acquired. If not done during
initialization, this can also be done by calling the register method of
this class.
When registering, this needs to be awaited.
Examples
>>> from distributed import Semaphore
... sem = Semaphore(max_leases=2, name='my_database')
...
... def access_resource(s, sem):
... # This automatically acquires a lease from the semaphore (if available) which will be
... # released when leaving the context manager.
... with sem:
... pass
...
... futures = client.map(access_resource, range(10), sem=sem)
... client.gather(futures)
... # Once done, close the semaphore to clean up the state on scheduler side.
... sem.close()
Notes
If a client attempts to release the semaphore but doesn't have a lease acquired, this will raise an exception.
When a semaphore is closed, if, for that closed semaphore, a client attempts to:
- Acquire a lease: an exception will be raised.
- Release: a warning will be logged.
- Close: nothing will happen.
dask executes functions by default assuming they are pure, when using semaphore acquire/releases inside
such a function, it must be noted that there *are* in fact side-effects, thus, the function can no longer be
considered pure. If this is not taken into account, this may lead to unexpected behavior.
Methods
▷ def __init__(self, max_leases=1, name=None, client=None, register=True) ▶ def acquire(self, timeout=None) Acquire a semaphore.
If the internal counter is greater than zero, decrement it by one and return True immediately.
If it is zero, wait until a release() is called and return True.
Parameters
- timeout : number or string or timedelta
Seconds to wait on acquiring the semaphore. This does not
include local coroutine time, network transfer time, etc..
Instead of number of seconds, it is also possible to specify
a timedelta in string format, e.g. "200ms".
▷ def get_value(self) Return the number of currently registered leases.
▶ def register(self) Register the semaphore on scheduler side
This will register the semaphore on scheduler side and ensure that all necessary data structures exist.
▶ def release(self) Release the semaphore.
Returns
- bool
This value indicates whether a lease was released immediately or not. Note that a user should *not* retry
this operation. Under certain circumstances (e.g. scheduler overload) the lease may not be released
immediately, but it will always be automatically released after a specific interval configured using
"distributed.scheduler.locks.lease-validation-interval" and "distributed.scheduler.locks.lease-timeout".
Reexports