|   1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 | import struct
from ..utils import nbytes
BIG_BYTES_SHARD_SIZE = 2 ** 26
msgpack_opts = {
    ("max_%s_len" % x): 2 ** 31 - 1 for x in ["str", "bin", "array", "map", "ext"]
}
msgpack_opts["strict_map_key"] = False
msgpack_opts["raw"] = False
def frame_split_size(frame, n=BIG_BYTES_SHARD_SIZE) -> list:
    """
    Split a frame into a list of frames of maximum size
    This helps us to avoid passing around very large bytestrings.
    Examples
    --------
    >>> frame_split_size([b'12345', b'678'], n=3)  # doctest: +SKIP
    [b'123', b'45', b'678']
    """
    frame = memoryview(frame)
    if frame.nbytes <= n:
        return [frame]
    nitems = frame.nbytes // frame.itemsize
    items_per_shard = n // frame.itemsize
    return [frame[i : i + items_per_shard] for i in range(0, nitems, items_per_shard)]
def merge_frames(header, frames):
    """Merge frames into original lengths
    Examples
    --------
    >>> merge_frames({'lengths': [3, 3]}, [b'123456'])
    [b'123', b'456']
    >>> merge_frames({'lengths': [6]}, [b'123', b'456'])
    [b'123456']
    """
    lengths = list(header["lengths"])
    writeables = list(header["writeable"])
    assert len(lengths) == len(writeables)
    assert sum(lengths) == sum(map(nbytes, frames))
    if all(len(f) == l for f, l in zip(frames, lengths)):
        return [
            (bytearray(f) if w else bytes(f)) if w == memoryview(f).readonly else f
            for w, f in zip(header["writeable"], frames)
        ]
    frames = frames[::-1]
    lengths = lengths[::-1]
    writeables = writeables[::-1]
    out = []
    while lengths:
        l = lengths.pop()
        w = writeables.pop()
        L = []
        while l:
            frame = frames.pop()
            if nbytes(frame) <= l:
                L.append(frame)
                l -= nbytes(frame)
            else:
                frame = memoryview(frame)
                L.append(frame[:l])
                frames.append(frame[l:])
                l = 0
        if len(L) == 1 and w != memoryview(L[0]).readonly:  # no work necessary
            out.extend(L)
        elif w:
            out.append(bytearray().join(L))
        else:
            out.append(bytes().join(L))
    return out
def pack_frames_prelude(frames):
    nframes = len(frames)
    nbytes_frames = map(nbytes, frames)
    return struct.pack(f"Q{nframes}Q", nframes, *nbytes_frames)
def pack_frames(frames):
    """Pack frames into a byte-like object
    This prepends length information to the front of the bytes-like object
    See Also
    --------
    unpack_frames
    """
    return b"".join([pack_frames_prelude(frames), *frames])
def unpack_frames(b):
    """Unpack bytes into a sequence of frames
    This assumes that length information is at the front of the bytestring,
    as performed by pack_frames
    See Also
    --------
    pack_frames
    """
    b = memoryview(b)
    fmt = "Q"
    fmt_size = struct.calcsize(fmt)
    (n_frames,) = struct.unpack_from(fmt, b)
    lengths = struct.unpack_from(f"{n_frames}{fmt}", b, fmt_size)
    frames = []
    start = fmt_size * (1 + n_frames)
    for length in lengths:
        end = start + length
        frame = b[start:end]
        frames.append(frame)
        start = end
    return frames
 |