Source code distributed/protocol/keras.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from .serialize import dask_serialize, dask_deserialize, serialize, deserialize

import keras


@dask_serialize.register(keras.Model)
def serialize_keras_model(model):
    import keras

    if keras.__version__ < "1.2.0":
        raise ImportError(
            "Need Keras >= 1.2.0. Try python -m pip install keras --upgrade --no-deps"
        )

    header = model._updated_config()
    weights = model.get_weights()
    headers, frames = list(zip(*map(serialize, weights)))
    header["headers"] = headers
    header["nframes"] = [len(L) for L in frames]
    frames = [frame for L in frames for frame in L]
    return header, frames


@dask_deserialize.register(keras.Model)
def deserialize_keras_model(header, frames):
    from keras.models import model_from_config

    n = 0
    weights = []
    for head, length in zip(header["headers"], header["nframes"]):
        x = deserialize(head, frames[n : n + length])
        weights.append(x)
        n += length
    model = model_from_config(header)
    model.set_weights(weights)
    return model