Source code distributed/diagnostics/tests/test_progress_stream.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import pytest

pytest.importorskip("bokeh")

from dask import delayed
from distributed.client import wait
from distributed.diagnostics.progress_stream import progress_quads, progress_stream
from distributed.utils_test import div, gen_cluster, inc


def test_progress_quads():
    msg = {
        "all": {"inc": 5, "dec": 1, "add": 4},
        "memory": {"inc": 2, "dec": 0, "add": 1},
        "erred": {"inc": 0, "dec": 1, "add": 0},
        "released": {"inc": 1, "dec": 0, "add": 1},
        "processing": {"inc": 1, "dec": 0, "add": 2},
    }

    d = progress_quads(msg, nrows=2)
    color = d.pop("color")
    assert len(set(color)) == 3
    expected = {
        "name": ["inc", "add", "dec"],
        "show-name": ["inc", "add", "dec"],
        "left": [0, 0, 1],
        "right": [0.9, 0.9, 1.9],
        "top": [0, -1, 0],
        "bottom": [-0.8, -1.8, -0.8],
        "all": [5, 4, 1],
        "released": [1, 1, 0],
        "memory": [2, 1, 0],
        "erred": [0, 0, 1],
        "processing": [1, 2, 0],
        "done": ["3 / 5", "2 / 4", "1 / 1"],
        "released-loc": [0.9 * 1 / 5, 0.25 * 0.9, 1.0],
        "memory-loc": [0.9 * 3 / 5, 0.5 * 0.9, 1.0],
        "erred-loc": [0.9 * 3 / 5, 0.5 * 0.9, 1.9],
        "processing-loc": [0.9 * 4 / 5, 1 * 0.9, 1 * 0.9 + 1],
    }
    assert d == expected


def test_progress_quads_too_many():
    keys = ["x-%d" % i for i in range(1000)]
    msg = {
        "all": {k: 1 for k in keys},
        "memory": {k: 0 for k in keys},
        "erred": {k: 0 for k in keys},
        "released": {k: 0 for k in keys},
        "processing": {k: 0 for k in keys},
    }

    d = progress_quads(msg, nrows=6, ncols=3)
    assert len(d["name"]) == 6 * 3


@gen_cluster(client=True)
async def test_progress_stream(c, s, a, b):
    futures = c.map(div, [1] * 10, range(10))

    x = 1
    for i in range(5):
        x = delayed(inc)(x)
    future = c.compute(x)

    await wait(futures + [future])

    comm = await progress_stream(s.address, interval=0.010)
    msg = await comm.read()
    nbytes = msg.pop("nbytes")
    assert msg == {
        "all": {"div": 10, "inc": 5},
        "erred": {"div": 1},
        "memory": {"div": 9, "inc": 1},
        "released": {"inc": 4},
        "processing": {},
    }
    assert set(nbytes) == set(msg["all"])
    assert all(v > 0 for v in nbytes.values())

    assert progress_quads(msg)

    await comm.close()


def test_progress_quads_many_functions():
    funcnames = ["fn%d" % i for i in range(1000)]
    msg = {
        "all": {fn: 1 for fn in funcnames},
        "memory": {fn: 1 for fn in funcnames},
        "erred": {fn: 0 for fn in funcnames},
        "released": {fn: 0 for fn in funcnames},
        "processing": {fn: 0 for fn in funcnames},
    }

    d = progress_quads(msg, nrows=2)
    color = d.pop("color")
    assert len(set(color)) < 100