Source code distributed/deploy/utils_test.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from ..client import Client

import pytest


class ClusterTest:
    Cluster = None
    kwargs = {}

    def setUp(self):
        self.cluster = self.Cluster(2, scheduler_port=0, **self.kwargs)
        self.client = Client(self.cluster.scheduler_address)

    def tearDown(self):
        self.client.close()
        self.cluster.close()

    @pytest.mark.xfail()
    def test_cores(self):
        info = self.client.scheduler_info()
        assert len(self.client.nthreads()) == 2

    def test_submit(self):
        future = self.client.submit(lambda x: x + 1, 1)
        assert future.result() == 2

    def test_context_manager(self):
        with self.Cluster(**self.kwargs) as c:
            with Client(c) as e:
                assert e.nthreads()

    def test_no_workers(self):
        with self.Cluster(0, scheduler_port=0, **self.kwargs):
            pass