Source code distributed/cli/tests/test_dask_spec.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import pytest
import sys
import yaml

from distributed import Client
from distributed.utils_test import popen
from distributed.utils_test import cleanup  # noqa: F401


@pytest.mark.asyncio
async def test_text(cleanup):
    with popen(
        [
            sys.executable,
            "-m",
            "distributed.cli.dask_spec",
            "--spec",
            '{"cls": "dask.distributed.Scheduler", "opts": {"port": 9373}}',
        ]
    ) as sched:
        with popen(
            [
                sys.executable,
                "-m",
                "distributed.cli.dask_spec",
                "tcp://localhost:9373",
                "--spec",
                '{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}',
            ]
        ) as w:
            async with Client("tcp://localhost:9373", asynchronous=True) as client:
                await client.wait_for_workers(1)
                info = await client.scheduler.identity()
                [w] = info["workers"].values()
                assert w["name"] == "foo"
                assert w["nthreads"] == 3


@pytest.mark.asyncio
async def test_file(cleanup, tmp_path):
    fn = str(tmp_path / "foo.yaml")
    with open(fn, "w") as f:
        yaml.dump(
            {
                "cls": "dask.distributed.Worker",
                "opts": {"nanny": False, "nthreads": 3, "name": "foo"},
            },
            f,
        )

    with popen(["dask-scheduler", "--port", "9373", "--no-dashboard"]) as sched:
        with popen(
            [
                sys.executable,
                "-m",
                "distributed.cli.dask_spec",
                "tcp://localhost:9373",
                "--spec-file",
                fn,
            ]
        ) as w:
            async with Client("tcp://localhost:9373", asynchronous=True) as client:
                await client.wait_for_workers(1)
                info = await client.scheduler.identity()
                [w] = info["workers"].values()
                assert w["name"] == "foo"
                assert w["nthreads"] == 3


def test_errors():
    with popen(
        [
            sys.executable,
            "-m",
            "distributed.cli.dask_spec",
            "--spec",
            '{"foo": "bar"}',
            "--spec-file",
            "foo.yaml",
        ]
    ) as proc:
        line = proc.stdout.readline().decode()
        assert "exactly one" in line
        assert "--spec" in line and "--spec-file" in line

    with popen([sys.executable, "-m", "distributed.cli.dask_spec"]) as proc:
        line = proc.stdout.readline().decode()
        assert "exactly one" in line
        assert "--spec" in line and "--spec-file" in line