Source code distributed/tests/test_priorities.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio

import pytest

from dask.core import flatten
import dask
from dask import delayed, persist

from distributed.utils_test import gen_cluster, inc, slowinc, slowdec
from distributed import wait, Worker
from distributed.utils import tokey


@gen_cluster(client=True, nthreads=[])
async def test_submit(c, s):
    low = c.submit(inc, 1, priority=-1)
    futures = c.map(slowinc, range(10), delay=0.1)
    high = c.submit(inc, 2, priority=1)
    async with Worker(s.address, nthreads=1):
        await wait(high)
        assert all(s.processing.values())
        assert s.tasks[low.key].state == "processing"


@gen_cluster(client=True, nthreads=[])
async def test_map(c, s):
    low = c.map(inc, [1, 2, 3], priority=-1)
    futures = c.map(slowinc, range(10), delay=0.1)
    high = c.map(inc, [4, 5, 6], priority=1)
    async with Worker(s.address, nthreads=1):
        await wait(high)
        assert all(s.processing.values())
        assert s.tasks[low[0].key].state == "processing"


@gen_cluster(client=True, nthreads=[])
async def test_compute(c, s):
    da = pytest.importorskip("dask.array")
    x = da.random.random((10, 10), chunks=(5, 5))
    y = da.random.random((10, 10), chunks=(5, 5))

    low = c.compute(x, priority=-1)
    futures = c.map(slowinc, range(10), delay=0.1)
    high = c.compute(y, priority=1)
    async with Worker(s.address, nthreads=1):
        await wait(high)
        assert all(s.processing.values())
        assert s.tasks[tokey(low.key)].state in ("processing", "waiting")


@gen_cluster(client=True, nthreads=[])
async def test_persist(c, s):
    da = pytest.importorskip("dask.array")
    x = da.random.random((10, 10), chunks=(5, 5))
    y = da.random.random((10, 10), chunks=(5, 5))

    low = x.persist(priority=-1)
    futures = c.map(slowinc, range(10), delay=0.1)
    high = y.persist(priority=1)
    async with Worker(s.address, nthreads=1):
        await wait(high)
        assert all(s.processing.values())
        assert all(
            s.tasks[tokey(k)].state in ("processing", "waiting")
            for k in flatten(low.__dask_keys__())
        )


@gen_cluster(client=True)
async def test_expand_compute(c, s, a, b):
    low = delayed(inc)(1)
    many = [delayed(slowinc)(i, delay=0.1) for i in range(10)]
    high = delayed(inc)(2)

    low, many, high = c.compute([low, many, high], priority={low: -1, high: 1})
    await wait(high)
    assert s.tasks[low.key].state == "processing"


@gen_cluster(client=True)
async def test_expand_persist(c, s, a, b):
    low = delayed(inc)(1, dask_key_name="low")
    many = [delayed(slowinc)(i, delay=0.1) for i in range(4)]
    high = delayed(inc)(2, dask_key_name="high")

    low, high, x, y, z, w = persist(low, high, *many, priority={low: -1, high: 1})
    await wait(high)
    assert s.tasks[low.key].state == "processing"


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_repeated_persists_same_priority(c, s, w):
    xs = [delayed(slowinc)(i, delay=0.05, dask_key_name="x-%d" % i) for i in range(10)]
    ys = [
        delayed(slowinc)(x, delay=0.05, dask_key_name="y-%d" % i)
        for i, x in enumerate(xs)
    ]
    zs = [
        delayed(slowdec)(x, delay=0.05, dask_key_name="z-%d" % i)
        for i, x in enumerate(xs)
    ]

    ys = dask.persist(*ys)
    zs = dask.persist(*zs)

    while (
        sum(t.state == "memory" for t in s.tasks.values()) < 5
    ):  # TODO: reduce this number
        await asyncio.sleep(0.01)

    assert any(s.tasks[y.key].state == "memory" for y in ys)
    assert any(s.tasks[z.key].state == "memory" for z in zs)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_last_in_first_out(c, s, w):
    xs = [c.submit(slowinc, i, delay=0.05) for i in range(5)]
    ys = [c.submit(slowinc, x, delay=0.05) for x in xs]
    zs = [c.submit(slowinc, y, delay=0.05) for y in ys]

    while len(s.tasks) < 15 or not any(s.tasks[z.key].state == "memory" for z in zs):
        await asyncio.sleep(0.01)

    assert not all(s.tasks[x.key].state == "memory" for x in xs)