1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | import asyncio
import pytest
from dask.core import flatten
import dask
from dask import delayed, persist
from distributed.utils_test import gen_cluster, inc, slowinc, slowdec
from distributed import wait, Worker
from distributed.utils import tokey
@gen_cluster(client=True, nthreads=[])
async def test_submit(c, s):
low = c.submit(inc, 1, priority=-1)
futures = c.map(slowinc, range(10), delay=0.1)
high = c.submit(inc, 2, priority=1)
async with Worker(s.address, nthreads=1):
await wait(high)
assert all(s.processing.values())
assert s.tasks[low.key].state == "processing"
@gen_cluster(client=True, nthreads=[])
async def test_map(c, s):
low = c.map(inc, [1, 2, 3], priority=-1)
futures = c.map(slowinc, range(10), delay=0.1)
high = c.map(inc, [4, 5, 6], priority=1)
async with Worker(s.address, nthreads=1):
await wait(high)
assert all(s.processing.values())
assert s.tasks[low[0].key].state == "processing"
@gen_cluster(client=True, nthreads=[])
async def test_compute(c, s):
da = pytest.importorskip("dask.array")
x = da.random.random((10, 10), chunks=(5, 5))
y = da.random.random((10, 10), chunks=(5, 5))
low = c.compute(x, priority=-1)
futures = c.map(slowinc, range(10), delay=0.1)
high = c.compute(y, priority=1)
async with Worker(s.address, nthreads=1):
await wait(high)
assert all(s.processing.values())
assert s.tasks[tokey(low.key)].state in ("processing", "waiting")
@gen_cluster(client=True, nthreads=[])
async def test_persist(c, s):
da = pytest.importorskip("dask.array")
x = da.random.random((10, 10), chunks=(5, 5))
y = da.random.random((10, 10), chunks=(5, 5))
low = x.persist(priority=-1)
futures = c.map(slowinc, range(10), delay=0.1)
high = y.persist(priority=1)
async with Worker(s.address, nthreads=1):
await wait(high)
assert all(s.processing.values())
assert all(
s.tasks[tokey(k)].state in ("processing", "waiting")
for k in flatten(low.__dask_keys__())
)
@gen_cluster(client=True)
async def test_expand_compute(c, s, a, b):
low = delayed(inc)(1)
many = [delayed(slowinc)(i, delay=0.1) for i in range(10)]
high = delayed(inc)(2)
low, many, high = c.compute([low, many, high], priority={low: -1, high: 1})
await wait(high)
assert s.tasks[low.key].state == "processing"
@gen_cluster(client=True)
async def test_expand_persist(c, s, a, b):
low = delayed(inc)(1, dask_key_name="low")
many = [delayed(slowinc)(i, delay=0.1) for i in range(4)]
high = delayed(inc)(2, dask_key_name="high")
low, high, x, y, z, w = persist(low, high, *many, priority={low: -1, high: 1})
await wait(high)
assert s.tasks[low.key].state == "processing"
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_repeated_persists_same_priority(c, s, w):
xs = [delayed(slowinc)(i, delay=0.05, dask_key_name="x-%d" % i) for i in range(10)]
ys = [
delayed(slowinc)(x, delay=0.05, dask_key_name="y-%d" % i)
for i, x in enumerate(xs)
]
zs = [
delayed(slowdec)(x, delay=0.05, dask_key_name="z-%d" % i)
for i, x in enumerate(xs)
]
ys = dask.persist(*ys)
zs = dask.persist(*zs)
while (
sum(t.state == "memory" for t in s.tasks.values()) < 5
): # TODO: reduce this number
await asyncio.sleep(0.01)
assert any(s.tasks[y.key].state == "memory" for y in ys)
assert any(s.tasks[z.key].state == "memory" for z in zs)
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_last_in_first_out(c, s, w):
xs = [c.submit(slowinc, i, delay=0.05) for i in range(5)]
ys = [c.submit(slowinc, x, delay=0.05) for x in xs]
zs = [c.submit(slowinc, y, delay=0.05) for y in ys]
while len(s.tasks) < 15 or not any(s.tasks[z.key].state == "memory" for z in zs):
await asyncio.sleep(0.01)
assert not all(s.tasks[x.key].state == "memory" for x in xs)
|