Source code distributed/tests/test_threadpoolexecutor.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from time import sleep
import threading

from distributed.metrics import time
from distributed.threadpoolexecutor import ThreadPoolExecutor, secede, rejoin


def test_tpe():
    with ThreadPoolExecutor(2) as e:
        list(e.map(sleep, [0.01] * 4))

        threads = e._threads.copy()
        assert len(threads) == 2

        def f():
            secede()
            return 1

        assert e.submit(f).result() == 1

        list(e.map(sleep, [0.01] * 4))
        assert len(threads | e._threads) == 3

        start = time()
        while all(t.is_alive() for t in threads):
            sleep(0.01)
            assert time() < start + 1


def test_shutdown_timeout():
    e = ThreadPoolExecutor(1)
    futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
    sleep(0.01)

    start = time()
    e.shutdown()
    end = time()
    assert end - start > 0.1


def test_shutdown_timeout_raises():
    e = ThreadPoolExecutor(1)
    futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
    sleep(0.05)

    start = time()
    e.shutdown(timeout=0.1)
    end = time()
    assert end - start > 0.05


def test_shutdown_wait():
    e = ThreadPoolExecutor(1)
    future = e.submit(sleep, 1)
    sleep(0.01)

    start = time()
    e.shutdown(wait=False)
    end = time()
    assert end - start < 1


def test_secede_rejoin_busy():
    with ThreadPoolExecutor(2) as e:

        def f():
            assert threading.current_thread() in e._threads
            secede()
            sleep(0.1)
            assert threading.current_thread() not in e._threads
            rejoin()
            assert len(e._threads) == 2
            assert threading.current_thread() in e._threads
            return threading.current_thread()

        future = e.submit(f)
        L = [e.submit(sleep, 0.2) for i in range(10)]
        start = time()
        special_thread = future.result()
        stop = time()

        assert 0.1 < stop - start < 0.3

        assert len(e._threads) == 2
        assert special_thread in e._threads

        def f():
            sleep(0.01)
            return threading.current_thread()

        futures = [e.submit(f) for _ in range(10)]
        assert special_thread in {future.result() for future in futures}


def test_secede_rejoin_quiet():
    with ThreadPoolExecutor(2) as e:

        def f():
            assert threading.current_thread() in e._threads
            secede()
            sleep(0.1)
            assert threading.current_thread() not in e._threads
            rejoin()
            assert len(e._threads) == 2
            assert threading.current_thread() in e._threads
            return threading.current_thread()

        future = e.submit(f)
        result = future.result()


def test_rejoin_idempotent():
    with ThreadPoolExecutor(2) as e:

        def f():
            secede()
            for i in range(5):
                rejoin()
            return 1

        future = e.submit(f)
        result = future.result()


def test_thread_name():
    with ThreadPoolExecutor(2) as e:
        e.map(id, range(10))
        assert len({thread.name for thread in e._threads}) == 2