Source code distributed/diagnostics/tests/test_progressbar.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from time import sleep

import pytest

from distributed import Scheduler, Worker
from distributed.diagnostics.progressbar import TextProgressBar, progress
from distributed.metrics import time
from distributed.utils_test import inc, div, gen_cluster
from distributed.utils_test import client, loop, cluster_fixture  # noqa: F401


def test_text_progressbar(capsys, client):
    futures = client.map(inc, range(10))
    p = TextProgressBar(futures, interval=0.01, complete=True)
    client.gather(futures)

    start = time()
    while p.status != "finished":
        sleep(0.01)
        assert time() - start < 5

    check_bar_completed(capsys)
    assert p._last_response == {"all": 10, "remaining": 0, "status": "finished"}
    assert p.comm.closed()


@gen_cluster(client=True)
async def test_TextProgressBar_error(c, s, a, b):
    x = c.submit(div, 1, 0)

    progress = TextProgressBar([x.key], scheduler=s.address, start=False, interval=0.01)
    await progress.listen()

    assert progress.status == "error"
    assert progress.comm.closed()

    progress = TextProgressBar([x.key], scheduler=s.address, start=False, interval=0.01)
    await progress.listen()
    assert progress.status == "error"
    assert progress.comm.closed()


@pytest.mark.asyncio
async def test_TextProgressBar_empty(capsys):
    async with Scheduler(port=0) as s:
        async with Worker(s.address, nthreads=1) as a:
            async with Worker(s.address, nthreads=1) as b:
                progress = TextProgressBar(
                    [], scheduler=s.address, start=False, interval=0.01
                )
                await progress.listen()

                assert progress.status == "finished"
                check_bar_completed(capsys)


def check_bar_completed(capsys, width=40):
    out, err = capsys.readouterr()
    # trailing newline so grab next to last line for final state of bar
    bar, percent, time = [i.strip() for i in out.split("\r")[-2].split("|")]
    assert bar == "[" + "#" * width + "]"
    assert percent == "100% Completed"


def test_progress_function(client, capsys):
    f = client.submit(lambda: 1)
    g = client.submit(lambda: 2)

    progress([[f], [[g]]], notebook=False)
    check_bar_completed(capsys)

    progress(f)
    check_bar_completed(capsys)


def test_progress_function_w_kwargs(client, capsys):
    f = client.submit(lambda: 1)
    g = client.submit(lambda: 2)

    progress(f, interval="20ms")
    check_bar_completed(capsys)