1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | from time import sleep
import threading
from distributed.metrics import time
from distributed.threadpoolexecutor import ThreadPoolExecutor, secede, rejoin
def test_tpe():
with ThreadPoolExecutor(2) as e:
list(e.map(sleep, [0.01] * 4))
threads = e._threads.copy()
assert len(threads) == 2
def f():
secede()
return 1
assert e.submit(f).result() == 1
list(e.map(sleep, [0.01] * 4))
assert len(threads | e._threads) == 3
start = time()
while all(t.is_alive() for t in threads):
sleep(0.01)
assert time() < start + 1
def test_shutdown_timeout():
e = ThreadPoolExecutor(1)
futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
sleep(0.01)
start = time()
e.shutdown()
end = time()
assert end - start > 0.1
def test_shutdown_timeout_raises():
e = ThreadPoolExecutor(1)
futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
sleep(0.05)
start = time()
e.shutdown(timeout=0.1)
end = time()
assert end - start > 0.05
def test_shutdown_wait():
e = ThreadPoolExecutor(1)
future = e.submit(sleep, 1)
sleep(0.01)
start = time()
e.shutdown(wait=False)
end = time()
assert end - start < 1
def test_secede_rejoin_busy():
with ThreadPoolExecutor(2) as e:
def f():
assert threading.current_thread() in e._threads
secede()
sleep(0.1)
assert threading.current_thread() not in e._threads
rejoin()
assert len(e._threads) == 2
assert threading.current_thread() in e._threads
return threading.current_thread()
future = e.submit(f)
L = [e.submit(sleep, 0.2) for i in range(10)]
start = time()
special_thread = future.result()
stop = time()
assert 0.1 < stop - start < 0.3
assert len(e._threads) == 2
assert special_thread in e._threads
def f():
sleep(0.01)
return threading.current_thread()
futures = [e.submit(f) for _ in range(10)]
assert special_thread in {future.result() for future in futures}
def test_secede_rejoin_quiet():
with ThreadPoolExecutor(2) as e:
def f():
assert threading.current_thread() in e._threads
secede()
sleep(0.1)
assert threading.current_thread() not in e._threads
rejoin()
assert len(e._threads) == 2
assert threading.current_thread() in e._threads
return threading.current_thread()
future = e.submit(f)
result = future.result()
def test_rejoin_idempotent():
with ThreadPoolExecutor(2) as e:
def f():
secede()
for i in range(5):
rejoin()
return 1
future = e.submit(f)
result = future.result()
def test_thread_name():
with ThreadPoolExecutor(2) as e:
e.map(id, range(10))
assert len({thread.name for thread in e._threads}) == 2
|