Source code distributed/deploy/tests/test_adaptive_core.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import pytest

from distributed.deploy.adaptive_core import AdaptiveCore
from distributed.metrics import time


class MyAdaptive(AdaptiveCore):
    def __init__(self, *args, interval=None, **kwargs):
        super().__init__(*args, interval=interval, **kwargs)
        self._target = 0
        self._log = []

    async def target(self):
        return self._target

    async def scale_up(self, n=0):
        self.plan = self.requested = set(range(n))

    async def scale_down(self, workers=()):
        for collection in [self.plan, self.requested, self.observed]:
            for w in workers:
                collection.discard(w)


@pytest.mark.asyncio
async def test_safe_target():
    adapt = MyAdaptive(minimum=1, maximum=4)
    assert await adapt.safe_target() == 1
    adapt._target = 10
    assert await adapt.safe_target() == 4


@pytest.mark.asyncio
async def test_scale_up():
    adapt = MyAdaptive(minimum=1, maximum=4)
    await adapt.adapt()
    assert adapt.log[-1][1] == {"status": "up", "n": 1}
    assert adapt.plan == {0}

    adapt._target = 10
    await adapt.adapt()
    assert adapt.log[-1][1] == {"status": "up", "n": 4}
    assert adapt.plan == {0, 1, 2, 3}


@pytest.mark.asyncio
async def test_scale_down():
    adapt = MyAdaptive(minimum=1, maximum=4, wait_count=2)
    adapt._target = 10
    await adapt.adapt()
    assert len(adapt.log) == 1

    adapt.observed = {0, 1, 3}  # all but 2 have arrived

    adapt._target = 2
    await adapt.adapt()
    assert len(adapt.log) == 1  # no change after only one call
    await adapt.adapt()
    assert len(adapt.log) == 2  # no change after only one call
    assert adapt.log[-1][1]["status"] == "down"
    assert 2 in adapt.log[-1][1]["workers"]
    assert len(adapt.log[-1][1]["workers"]) == 2

    old = list(adapt.log)
    await adapt.adapt()
    await adapt.adapt()
    await adapt.adapt()
    await adapt.adapt()
    assert list(adapt.log) == old


@pytest.mark.asyncio
async def test_interval():
    adapt = MyAdaptive(interval="5 ms")
    assert not adapt.plan

    for i in [0, 3, 1]:
        start = time()
        adapt._target = i
        while len(adapt.plan) != i:
            await asyncio.sleep(0.001)
            assert time() < start + 2

    adapt.stop()
    await asyncio.sleep(0.050)

    adapt._target = 10
    await asyncio.sleep(0.020)
    assert len(adapt.plan) == 1  # last value from before, unchanged