Source code distributed/protocol/tests/test_protocol_utils.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import pytest

from distributed.protocol.utils import merge_frames, pack_frames, unpack_frames
from distributed.utils import ensure_bytes, is_writeable


@pytest.mark.parametrize(
    "lengths,writeable,frames",
    [
        ([3], [False], [b"123"]),
        ([3], [True], [b"123"]),
        ([3], [None], [b"123"]),
        ([3], [False], [bytearray(b"123")]),
        ([3], [True], [bytearray(b"123")]),
        ([3], [None], [bytearray(b"123")]),
        ([3, 3], [False, False], [b"123", b"456"]),
        ([2, 3, 2], [False, True, None], [b"12345", b"67"]),
        ([2, 3, 2], [False, True, None], [bytearray(b"12345"), bytearray(b"67")]),
        ([5, 2], [False, True], [b"123", b"45", b"67"]),
        ([3, 4], [None, False], [b"12", b"34", b"567"]),
    ],
)
def test_merge_frames(lengths, writeable, frames):
    header = {"lengths": lengths, "writeable": writeable}
    result = merge_frames(header, frames)

    data = b"".join(frames)
    expected = []
    for i in lengths:
        expected.append(data[:i])
        data = data[i:]

    writeables = list(map(is_writeable, result))
    assert (r == e for r, e in zip(writeables, header["writeable"]) if e is not None)
    assert list(map(ensure_bytes, result)) == expected


def test_pack_frames():
    frames = [b"123", b"asdf"]
    b = pack_frames(frames)
    assert isinstance(b, bytes)
    frames2 = unpack_frames(b)

    assert frames == frames2