Source code django/contrib/gis/geos/prototypes/io.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import threading
from ctypes import POINTER, Structure, byref, c_byte, c_char_p, c_int, c_size_t

from django.contrib.gis.geos.base import GEOSBase
from django.contrib.gis.geos.libgeos import (
    GEOM_PTR, GEOSFuncFactory, geos_version_tuple,
)
from django.contrib.gis.geos.prototypes.errcheck import (
    check_geom, check_sized_string, check_string,
)
from django.contrib.gis.geos.prototypes.geom import c_uchar_p, geos_char_p
from django.utils.encoding import force_bytes


# ### The WKB/WKT Reader/Writer structures and pointers ###
class WKTReader_st(Structure):
    pass


class WKTWriter_st(Structure):
    pass


class WKBReader_st(Structure):
    pass


class WKBWriter_st(Structure):
    pass


WKT_READ_PTR = POINTER(WKTReader_st)
WKT_WRITE_PTR = POINTER(WKTWriter_st)
WKB_READ_PTR = POINTER(WKBReader_st)
WKB_WRITE_PTR = POINTER(WKBReader_st)

# WKTReader routines
wkt_reader_create = GEOSFuncFactory('GEOSWKTReader_create', restype=WKT_READ_PTR)
wkt_reader_destroy = GEOSFuncFactory('GEOSWKTReader_destroy', argtypes=[WKT_READ_PTR])

wkt_reader_read = GEOSFuncFactory(
    'GEOSWKTReader_read', argtypes=[WKT_READ_PTR, c_char_p], restype=GEOM_PTR, errcheck=check_geom
)
# WKTWriter routines
wkt_writer_create = GEOSFuncFactory('GEOSWKTWriter_create', restype=WKT_WRITE_PTR)
wkt_writer_destroy = GEOSFuncFactory('GEOSWKTWriter_destroy', argtypes=[WKT_WRITE_PTR])

wkt_writer_write = GEOSFuncFactory(
    'GEOSWKTWriter_write', argtypes=[WKT_WRITE_PTR, GEOM_PTR], restype=geos_char_p, errcheck=check_string
)

wkt_writer_get_outdim = GEOSFuncFactory(
    'GEOSWKTWriter_getOutputDimension', argtypes=[WKT_WRITE_PTR], restype=c_int
)
wkt_writer_set_outdim = GEOSFuncFactory(
    'GEOSWKTWriter_setOutputDimension', argtypes=[WKT_WRITE_PTR, c_int]
)

wkt_writer_set_trim = GEOSFuncFactory('GEOSWKTWriter_setTrim', argtypes=[WKT_WRITE_PTR, c_byte])
wkt_writer_set_precision = GEOSFuncFactory('GEOSWKTWriter_setRoundingPrecision', argtypes=[WKT_WRITE_PTR, c_int])

# WKBReader routines
wkb_reader_create = GEOSFuncFactory('GEOSWKBReader_create', restype=WKB_READ_PTR)
wkb_reader_destroy = GEOSFuncFactory('GEOSWKBReader_destroy', argtypes=[WKB_READ_PTR])


class WKBReadFunc(GEOSFuncFactory):
    # Although the function definitions take `const unsigned char *`
    # as their parameter, we use c_char_p here so the function may
    # take Python strings directly as parameters.  Inside Python there
    # is not a difference between signed and unsigned characters, so
    # it is not a problem.
    argtypes = [WKB_READ_PTR, c_char_p, c_size_t]
    restype = GEOM_PTR
    errcheck = staticmethod(check_geom)


wkb_reader_read = WKBReadFunc('GEOSWKBReader_read')
wkb_reader_read_hex = WKBReadFunc('GEOSWKBReader_readHEX')

# WKBWriter routines
wkb_writer_create = GEOSFuncFactory('GEOSWKBWriter_create', restype=WKB_WRITE_PTR)
wkb_writer_destroy = GEOSFuncFactory('GEOSWKBWriter_destroy', argtypes=[WKB_WRITE_PTR])


# WKB Writing prototypes.
class WKBWriteFunc(GEOSFuncFactory):
    argtypes = [WKB_WRITE_PTR, GEOM_PTR, POINTER(c_size_t)]
    restype = c_uchar_p
    errcheck = staticmethod(check_sized_string)


wkb_writer_write = WKBWriteFunc('GEOSWKBWriter_write')
wkb_writer_write_hex = WKBWriteFunc('GEOSWKBWriter_writeHEX')


# WKBWriter property getter/setter prototypes.
class WKBWriterGet(GEOSFuncFactory):
    argtypes = [WKB_WRITE_PTR]
    restype = c_int


class WKBWriterSet(GEOSFuncFactory):
    argtypes = [WKB_WRITE_PTR, c_int]


wkb_writer_get_byteorder = WKBWriterGet('GEOSWKBWriter_getByteOrder')
wkb_writer_set_byteorder = WKBWriterSet('GEOSWKBWriter_setByteOrder')
wkb_writer_get_outdim = WKBWriterGet('GEOSWKBWriter_getOutputDimension')
wkb_writer_set_outdim = WKBWriterSet('GEOSWKBWriter_setOutputDimension')
wkb_writer_get_include_srid = WKBWriterGet('GEOSWKBWriter_getIncludeSRID', restype=c_byte)
wkb_writer_set_include_srid = WKBWriterSet('GEOSWKBWriter_setIncludeSRID', argtypes=[WKB_WRITE_PTR, c_byte])


# ### Base I/O Class ###
class IOBase(GEOSBase):
    "Base class for GEOS I/O objects."
    def __init__(self):
        # Getting the pointer with the constructor.
        self.ptr = self._constructor()
        # Loading the real destructor function at this point as doing it in
        # __del__ is too late (import error).
        self.destructor.func

# ### Base WKB/WKT Reading and Writing objects ###


# Non-public WKB/WKT reader classes for internal use because
# their `read` methods return _pointers_ instead of GEOSGeometry
# objects.
class _WKTReader(IOBase):
    _constructor = wkt_reader_create
    ptr_type = WKT_READ_PTR
    destructor = wkt_reader_destroy

    def read(self, wkt):
        if not isinstance(wkt, (bytes, str)):
            raise TypeError
        return wkt_reader_read(self.ptr, force_bytes(wkt))


class _WKBReader(IOBase):
    _constructor = wkb_reader_create
    ptr_type = WKB_READ_PTR
    destructor = wkb_reader_destroy

    def read(self, wkb):
        "Return a _pointer_ to C GEOS Geometry object from the given WKB."
        if isinstance(wkb, memoryview):
            wkb_s = bytes(wkb)
            return wkb_reader_read(self.ptr, wkb_s, len(wkb_s))
        elif isinstance(wkb, (bytes, str)):
            return wkb_reader_read_hex(self.ptr, wkb, len(wkb))
        else:
            raise TypeError


# ### WKB/WKT Writer Classes ###
class WKTWriter(IOBase):
    _constructor = wkt_writer_create
    ptr_type = WKT_WRITE_PTR
    destructor = wkt_writer_destroy

    _trim = False
    _precision = None

    def __init__(self, dim=2, trim=False, precision=None):
        super().__init__()
        if bool(trim) != self._trim:
            self.trim = trim
        if precision is not None:
            self.precision = precision
        self.outdim = dim

    def write(self, geom):
        "Return the WKT representation of the given geometry."
        return wkt_writer_write(self.ptr, geom.ptr)

    @property
    def outdim(self):
        return wkt_writer_get_outdim(self.ptr)

    @outdim.setter
    def outdim(self, new_dim):
        if new_dim not in (2, 3):
            raise ValueError('WKT output dimension must be 2 or 3')
        wkt_writer_set_outdim(self.ptr, new_dim)

    @property
    def trim(self):
        return self._trim

    @trim.setter
    def trim(self, flag):
        if bool(flag) != self._trim:
            self._trim = bool(flag)
            wkt_writer_set_trim(self.ptr, self._trim)

    @property
    def precision(self):
        return self._precision

    @precision.setter
    def precision(self, precision):
        if (not isinstance(precision, int) or precision < 0) and precision is not None:
            raise AttributeError('WKT output rounding precision must be non-negative integer or None.')
        if precision != self._precision:
            self._precision = precision
            wkt_writer_set_precision(self.ptr, -1 if precision is None else precision)


class WKBWriter(IOBase):
    _constructor = wkb_writer_create
    ptr_type = WKB_WRITE_PTR
    destructor = wkb_writer_destroy
    geos_version = geos_version_tuple()

    def __init__(self, dim=2):
        super().__init__()
        self.outdim = dim

    def _handle_empty_point(self, geom):
        from django.contrib.gis.geos import Point
        if isinstance(geom, Point) and geom.empty:
            if self.srid:
                # PostGIS uses POINT(NaN NaN) for WKB representation of empty
                # points. Use it for EWKB as it's a PostGIS specific format.
                # https://trac.osgeo.org/postgis/ticket/3181
                geom = Point(float('NaN'), float('NaN'), srid=geom.srid)
            else:
                raise ValueError('Empty point is not representable in WKB.')
        return geom

    def write(self, geom):
        "Return the WKB representation of the given geometry."
        from django.contrib.gis.geos import Polygon
        geom = self._handle_empty_point(geom)
        wkb = wkb_writer_write(self.ptr, geom.ptr, byref(c_size_t()))
        if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
            # Fix GEOS output for empty polygon.
            # See https://trac.osgeo.org/geos/ticket/680.
            wkb = wkb[:-8] + b'\0' * 4
        return memoryview(wkb)

    def write_hex(self, geom):
        "Return the HEXEWKB representation of the given geometry."
        from django.contrib.gis.geos.polygon import Polygon
        geom = self._handle_empty_point(geom)
        wkb = wkb_writer_write_hex(self.ptr, geom.ptr, byref(c_size_t()))
        if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
            wkb = wkb[:-16] + b'0' * 8
        return wkb

    # ### WKBWriter Properties ###

    # Property for getting/setting the byteorder.
    def _get_byteorder(self):
        return wkb_writer_get_byteorder(self.ptr)

    def _set_byteorder(self, order):
        if order not in (0, 1):
            raise ValueError('Byte order parameter must be 0 (Big Endian) or 1 (Little Endian).')
        wkb_writer_set_byteorder(self.ptr, order)

    byteorder = property(_get_byteorder, _set_byteorder)

    # Property for getting/setting the output dimension.
    @property
    def outdim(self):
        return wkb_writer_get_outdim(self.ptr)

    @outdim.setter
    def outdim(self, new_dim):
        if new_dim not in (2, 3):
            raise ValueError('WKB output dimension must be 2 or 3')
        wkb_writer_set_outdim(self.ptr, new_dim)

    # Property for getting/setting the include srid flag.
    @property
    def srid(self):
        return bool(wkb_writer_get_include_srid(self.ptr))

    @srid.setter
    def srid(self, include):
        wkb_writer_set_include_srid(self.ptr, bool(include))


# `ThreadLocalIO` object holds instances of the WKT and WKB reader/writer
# objects that are local to the thread.  The `GEOSGeometry` internals
# access these instances by calling the module-level functions, defined
# below.
class ThreadLocalIO(threading.local):
    wkt_r = None
    wkt_w = None
    wkb_r = None
    wkb_w = None
    ewkb_w = None


thread_context = ThreadLocalIO()


# These module-level routines return the I/O object that is local to the
# thread. If the I/O object does not exist yet it will be initialized.
def wkt_r():
    thread_context.wkt_r = thread_context.wkt_r or _WKTReader()
    return thread_context.wkt_r


def wkt_w(dim=2, trim=False, precision=None):
    if not thread_context.wkt_w:
        thread_context.wkt_w = WKTWriter(dim=dim, trim=trim, precision=precision)
    else:
        thread_context.wkt_w.outdim = dim
        thread_context.wkt_w.trim = trim
        thread_context.wkt_w.precision = precision
    return thread_context.wkt_w


def wkb_r():
    thread_context.wkb_r = thread_context.wkb_r or _WKBReader()
    return thread_context.wkb_r


def wkb_w(dim=2):
    if not thread_context.wkb_w:
        thread_context.wkb_w = WKBWriter(dim=dim)
    else:
        thread_context.wkb_w.outdim = dim
    return thread_context.wkb_w


def ewkb_w(dim=2):
    if not thread_context.ewkb_w:
        thread_context.ewkb_w = WKBWriter(dim=dim)
        thread_context.ewkb_w.srid = True
    else:
        thread_context.ewkb_w.outdim = dim
    return thread_context.ewkb_w