Source code distributed/tests/test_locks.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import pickle
from time import sleep
from datetime import timedelta

import pytest

from distributed import Lock, get_client, Client
from distributed.metrics import time
from distributed.utils_test import gen_cluster
from distributed.utils_test import client, cluster_fixture, loop  # noqa F401


@gen_cluster(client=True, nthreads=[("127.0.0.1", 8)] * 2)
async def test_lock(c, s, a, b):
    await c.set_metadata("locked", False)

    def f(x):
        client = get_client()
        with Lock("x") as lock:
            assert client.get_metadata("locked") is False
            client.set_metadata("locked", True)
            sleep(0.05)
            assert client.get_metadata("locked") is True
            client.set_metadata("locked", False)

    futures = c.map(f, range(20))
    await c.gather(futures)
    assert not s.extensions["locks"].events
    assert not s.extensions["locks"].ids


@gen_cluster(client=True)
async def test_timeout(c, s, a, b):
    locks = s.extensions["locks"]
    lock = Lock("x")
    result = await lock.acquire()
    assert result is True
    assert locks.ids["x"] == lock.id

    lock2 = Lock("x")
    assert lock.id != lock2.id

    start = time()
    result = await lock2.acquire(timeout=0.1)
    stop = time()
    assert stop - start < 0.3
    assert result is False
    assert locks.ids["x"] == lock.id
    assert not locks.events["x"]

    await lock.release()


@gen_cluster(client=True)
async def test_acquires_with_zero_timeout(c, s, a, b):
    lock = Lock("x")
    await lock.acquire(timeout=0)
    assert lock.locked()
    await lock.release()

    await lock.acquire(timeout="1s")
    await lock.release()
    await lock.acquire(timeout=timedelta(seconds=1))
    await lock.release()


@gen_cluster(client=True)
async def test_acquires_blocking(c, s, a, b):
    lock = Lock("x")
    await lock.acquire(blocking=False)
    assert lock.locked()
    await lock.release()
    assert not lock.locked()

    with pytest.raises(ValueError):
        lock.acquire(blocking=False, timeout=1)


def test_timeout_sync(client):
    with Lock("x") as lock:
        assert Lock("x").acquire(timeout=0.1) is False


@gen_cluster(client=True)
async def test_errors(c, s, a, b):
    lock = Lock("x")
    with pytest.raises(ValueError):
        await lock.release()


def test_lock_sync(client):
    def f(x):
        with Lock("x") as lock:
            client = get_client()
            assert client.get_metadata("locked") is False
            client.set_metadata("locked", True)
            sleep(0.05)
            assert client.get_metadata("locked") is True
            client.set_metadata("locked", False)

    client.set_metadata("locked", False)
    futures = client.map(f, range(10))
    client.gather(futures)


@gen_cluster(client=True)
async def test_lock_types(c, s, a, b):
    for name in [1, ("a", 1), ["a", 1], b"123", "123"]:
        lock = Lock(name)
        assert lock.name == name

        await lock.acquire()
        await lock.release()

    assert not s.extensions["locks"].events


@gen_cluster(client=True)
async def test_serializable(c, s, a, b):
    def f(x, lock=None):
        with lock:
            assert lock.name == "x"
            return x + 1

    lock = Lock("x")
    futures = c.map(f, range(10), lock=lock)
    await c.gather(futures)

    lock2 = pickle.loads(pickle.dumps(lock))
    assert lock2.name == lock.name
    assert lock2.client is lock.client


@pytest.mark.asyncio
async def test_locks():
    async with Client(processes=False, asynchronous=True) as c:
        assert c.asynchronous
        async with Lock("x"):
            lock2 = Lock("x")
            result = await lock2.acquire(timeout=0.1)
            assert result is False