Source code distributed/tests/test_client_loop.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import pytest
from distributed import LocalCluster, Client
from distributed.utils import LoopRunner


# Test if Client stops LoopRunner on close.
@pytest.mark.parametrize("with_own_loop", [True, False])
def test_close_loop_sync(with_own_loop):
    loop_runner = loop = None

    # Setup simple cluster with one threaded worker.
    # Complex setup is not required here since we test only IO loop teardown.
    cluster_params = dict(n_workers=1, dashboard_address=None, processes=False)

    loops_before = LoopRunner._all_loops.copy()

    # Start own loop or use current thread's one.
    if with_own_loop:
        loop_runner = LoopRunner()
        loop_runner.start()
        loop = loop_runner.loop

    with LocalCluster(loop=loop, **cluster_params) as cluster:
        with Client(cluster, loop=loop) as client:
            client.run(max, 1, 2)

    # own loop must be explicitly stopped.
    if with_own_loop:
        loop_runner.stop()

    # Internal loops registry must the same as before cluster running.
    # This means loop runners in LocalCluster and Client correctly stopped.
    # See LoopRunner._stop_unlocked().
    assert loops_before == LoopRunner._all_loops