Source code distributed/protocol/tests/test_pandas.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import numpy as np
import pandas as pd
import pytest

from dask.dataframe.utils import assert_eq

from distributed.protocol import (
    serialize,
    deserialize,
    decompress,
    dumps,
    loads,
    to_serialize,
)
from distributed.utils import ensure_bytes


dfs = [
    pd.DataFrame({}),
    pd.DataFrame({"x": [1, 2, 3]}),
    pd.DataFrame({"x": [1.0, 2.0, 3.0]}),
    pd.DataFrame({0: [1, 2, 3]}),
    pd.DataFrame({"x": [1.0, 2.0, 3.0], "y": [4.0, 5.0, 6.0]}),
    pd.DataFrame({"x": [1.0, 2.0, 3.0]}, index=pd.Index([4, 5, 6], name="bar")),
    pd.Series([1.0, 2.0, 3.0]),
    pd.Series([1.0, 2.0, 3.0], name="foo"),
    pd.Series([1.0, 2.0, 3.0], name="foo", index=[4, 5, 6]),
    pd.Series([1.0, 2.0, 3.0], name="foo", index=pd.Index([4, 5, 6], name="bar")),
    pd.DataFrame({"x": ["a", "b", "c"]}),
    pd.DataFrame({"x": [b"a", b"b", b"c"]}),
    pd.DataFrame({"x": pd.Categorical(["a", "b", "a"], ordered=True)}),
    pd.DataFrame({"x": pd.Categorical(["a", "b", "a"], ordered=False)}),
    pd.Index(pd.Categorical(["a"], categories=["a", "b"], ordered=True)),
    pd.date_range("2000", periods=12, freq="B"),
    pd.RangeIndex(10),
    pd.DataFrame(
        "a",
        index=pd.Index(["a", "b", "c", "d"], name="a"),
        columns=pd.Index(["A", "B", "C", "D"], name="columns"),
    ),
    pd.DataFrame(
        np.random.randn(10, 5), columns=list("ABCDE"), index=list("abcdefghij")
    ),
    pd.DataFrame(
        np.random.randn(10, 5), columns=list("ABCDE"), index=list("abcdefghij")
    ).where(lambda x: x > 0),
    pd.DataFrame(
        {
            "a": [0.0, 0.1],
            "B": [0.0, 1.0],
            "C": ["a", "b"],
            "D": pd.to_datetime(["2000", "2001"]),
        }
    ),
    pd.Series(["a", "b", "c"], index=["a", "b", "c"]),
    pd.DataFrame(
        np.random.randn(10, 5),
        columns=list("ABCDE"),
        index=pd.period_range("2000", periods=10, freq="B"),
    ),
    pd.DataFrame(
        np.random.randn(10, 5),
        columns=list("ABCDE"),
        index=pd.date_range("2000", periods=10, freq="B"),
    ),
    pd.Series(
        np.random.randn(10), name="a", index=pd.date_range("2000", periods=10, freq="B")
    ),
    pd.Index(["סשםקה7ךשץא", "8טלכז6לרפל"]),
]


@pytest.mark.parametrize("df", dfs)
def test_dumps_serialize_pandas(df):
    header, frames = serialize(df)
    if "compression" in header:
        frames = decompress(header, frames)
    df2 = deserialize(header, frames)

    assert_eq(df, df2)


def test_dumps_pandas_writable():
    a1 = np.arange(1000)
    s1 = pd.Series(a1)
    fs = dumps([to_serialize(s1)])
    # Make all frames read-only
    fs = list(map(ensure_bytes, fs))
    (s2,) = loads(fs)
    assert (s1 == s2).all()
    s2[...] = 0