Source code distributed/diagnostics/tests/test_graph_layout.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
import operator

from distributed.utils_test import gen_cluster, inc
from distributed.diagnostics import GraphLayout
from distributed import wait


@gen_cluster(client=True)
async def test_basic(c, s, a, b):
    gl = GraphLayout(s)
    futures = c.map(inc, range(5))
    total = c.submit(sum, futures)

    await total

    assert len(gl.x) == len(gl.y) == 6
    assert all(gl.x[f.key] == 0 for f in futures)
    assert gl.x[total.key] == 1
    assert min(gl.y.values()) < gl.y[total.key] < max(gl.y.values())


@gen_cluster(client=True)
async def test_construct_after_call(c, s, a, b):
    futures = c.map(inc, range(5))
    total = c.submit(sum, futures)

    await total

    gl = GraphLayout(s)

    assert len(gl.x) == len(gl.y) == 6
    assert all(gl.x[f.key] == 0 for f in futures)
    assert gl.x[total.key] == 1
    assert min(gl.y.values()) < gl.y[total.key] < max(gl.y.values())


@gen_cluster(client=True)
async def test_states(c, s, a, b):
    gl = GraphLayout(s)
    futures = c.map(inc, range(5))
    total = c.submit(sum, futures)
    del futures

    await total

    updates = {state for idx, state in gl.state_updates}
    assert "memory" in updates
    assert "processing" in updates
    assert "released" in updates


@gen_cluster(client=True)
async def test_release_tasks(c, s, a, b):
    gl = GraphLayout(s)
    futures = c.map(inc, range(5))
    total = c.submit(sum, futures)

    await total
    key = total.key
    del total
    while key in s.tasks:
        await asyncio.sleep(0.01)

    assert len(gl.visible_updates) == 1
    assert len(gl.visible_edge_updates) == 5


@gen_cluster(client=True)
async def test_forget(c, s, a, b):
    gl = GraphLayout(s)

    futures = c.map(inc, range(10))
    futures = c.map(inc, futures)
    await wait(futures)
    del futures
    while s.tasks:
        await asyncio.sleep(0.01)

    assert not gl.x
    assert not gl.y
    assert not gl.index
    assert not gl.index_edge
    assert not gl.collision


@gen_cluster(client=True)
async def test_unique_positions(c, s, a, b):
    gl = GraphLayout(s)

    x = c.submit(inc, 1)
    ys = [c.submit(operator.add, x, i) for i in range(5)]
    await wait(ys)

    y_positions = [(gl.x[k], gl.y[k]) for k in gl.x]
    assert len(y_positions) == len(set(y_positions))