Source code distributed/cli/tests/test_tls_cli.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from time import sleep

from distributed import Client
from distributed.utils_test import (
    popen,
    get_cert,
    new_config_file,
    tls_security,
    tls_only_config,
)
from distributed.utils_test import loop  # noqa: F401
from distributed.metrics import time


ca_file = get_cert("tls-ca-cert.pem")
cert = get_cert("tls-cert.pem")
key = get_cert("tls-key.pem")
keycert = get_cert("tls-key-cert.pem")


tls_args = ["--tls-ca-file", ca_file, "--tls-cert", keycert]
tls_args_2 = ["--tls-ca-file", ca_file, "--tls-cert", cert, "--tls-key", key]


def wait_for_cores(c, nthreads=1):
    start = time()
    while len(c.nthreads()) < 1:
        sleep(0.1)
        assert time() < start + 10


def test_basic(loop):
    with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s:
        with popen(
            ["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args
        ) as w:
            with Client(
                "tls://127.0.0.1:8786", loop=loop, security=tls_security()
            ) as c:
                wait_for_cores(c)


def test_nanny(loop):
    with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s:
        with popen(
            ["dask-worker", "--no-dashboard", "--nanny", "tls://127.0.0.1:8786"]
            + tls_args
        ) as w:
            with Client(
                "tls://127.0.0.1:8786", loop=loop, security=tls_security()
            ) as c:
                wait_for_cores(c)


def test_separate_key_cert(loop):
    with popen(["dask-scheduler", "--no-dashboard"] + tls_args_2) as s:
        with popen(
            ["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args_2
        ) as w:
            with Client(
                "tls://127.0.0.1:8786", loop=loop, security=tls_security()
            ) as c:
                wait_for_cores(c)


def test_use_config_file(loop):
    with new_config_file(tls_only_config()):
        with popen(["dask-scheduler", "--no-dashboard", "--host", "tls://"]) as s:
            with popen(["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"]) as w:
                with Client(
                    "tls://127.0.0.1:8786", loop=loop, security=tls_security()
                ) as c:
                    wait_for_cores(c)