1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339 | import threading
from ctypes import POINTER, Structure, byref, c_byte, c_char_p, c_int, c_size_t
from django.contrib.gis.geos.base import GEOSBase
from django.contrib.gis.geos.libgeos import (
GEOM_PTR, GEOSFuncFactory, geos_version_tuple,
)
from django.contrib.gis.geos.prototypes.errcheck import (
check_geom, check_sized_string, check_string,
)
from django.contrib.gis.geos.prototypes.geom import c_uchar_p, geos_char_p
from django.utils.encoding import force_bytes
# ### The WKB/WKT Reader/Writer structures and pointers ###
class WKTReader_st(Structure):
pass
class WKTWriter_st(Structure):
pass
class WKBReader_st(Structure):
pass
class WKBWriter_st(Structure):
pass
WKT_READ_PTR = POINTER(WKTReader_st)
WKT_WRITE_PTR = POINTER(WKTWriter_st)
WKB_READ_PTR = POINTER(WKBReader_st)
WKB_WRITE_PTR = POINTER(WKBReader_st)
# WKTReader routines
wkt_reader_create = GEOSFuncFactory('GEOSWKTReader_create', restype=WKT_READ_PTR)
wkt_reader_destroy = GEOSFuncFactory('GEOSWKTReader_destroy', argtypes=[WKT_READ_PTR])
wkt_reader_read = GEOSFuncFactory(
'GEOSWKTReader_read', argtypes=[WKT_READ_PTR, c_char_p], restype=GEOM_PTR, errcheck=check_geom
)
# WKTWriter routines
wkt_writer_create = GEOSFuncFactory('GEOSWKTWriter_create', restype=WKT_WRITE_PTR)
wkt_writer_destroy = GEOSFuncFactory('GEOSWKTWriter_destroy', argtypes=[WKT_WRITE_PTR])
wkt_writer_write = GEOSFuncFactory(
'GEOSWKTWriter_write', argtypes=[WKT_WRITE_PTR, GEOM_PTR], restype=geos_char_p, errcheck=check_string
)
wkt_writer_get_outdim = GEOSFuncFactory(
'GEOSWKTWriter_getOutputDimension', argtypes=[WKT_WRITE_PTR], restype=c_int
)
wkt_writer_set_outdim = GEOSFuncFactory(
'GEOSWKTWriter_setOutputDimension', argtypes=[WKT_WRITE_PTR, c_int]
)
wkt_writer_set_trim = GEOSFuncFactory('GEOSWKTWriter_setTrim', argtypes=[WKT_WRITE_PTR, c_byte])
wkt_writer_set_precision = GEOSFuncFactory('GEOSWKTWriter_setRoundingPrecision', argtypes=[WKT_WRITE_PTR, c_int])
# WKBReader routines
wkb_reader_create = GEOSFuncFactory('GEOSWKBReader_create', restype=WKB_READ_PTR)
wkb_reader_destroy = GEOSFuncFactory('GEOSWKBReader_destroy', argtypes=[WKB_READ_PTR])
class WKBReadFunc(GEOSFuncFactory):
# Although the function definitions take `const unsigned char *`
# as their parameter, we use c_char_p here so the function may
# take Python strings directly as parameters. Inside Python there
# is not a difference between signed and unsigned characters, so
# it is not a problem.
argtypes = [WKB_READ_PTR, c_char_p, c_size_t]
restype = GEOM_PTR
errcheck = staticmethod(check_geom)
wkb_reader_read = WKBReadFunc('GEOSWKBReader_read')
wkb_reader_read_hex = WKBReadFunc('GEOSWKBReader_readHEX')
# WKBWriter routines
wkb_writer_create = GEOSFuncFactory('GEOSWKBWriter_create', restype=WKB_WRITE_PTR)
wkb_writer_destroy = GEOSFuncFactory('GEOSWKBWriter_destroy', argtypes=[WKB_WRITE_PTR])
# WKB Writing prototypes.
class WKBWriteFunc(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR, GEOM_PTR, POINTER(c_size_t)]
restype = c_uchar_p
errcheck = staticmethod(check_sized_string)
wkb_writer_write = WKBWriteFunc('GEOSWKBWriter_write')
wkb_writer_write_hex = WKBWriteFunc('GEOSWKBWriter_writeHEX')
# WKBWriter property getter/setter prototypes.
class WKBWriterGet(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR]
restype = c_int
class WKBWriterSet(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR, c_int]
wkb_writer_get_byteorder = WKBWriterGet('GEOSWKBWriter_getByteOrder')
wkb_writer_set_byteorder = WKBWriterSet('GEOSWKBWriter_setByteOrder')
wkb_writer_get_outdim = WKBWriterGet('GEOSWKBWriter_getOutputDimension')
wkb_writer_set_outdim = WKBWriterSet('GEOSWKBWriter_setOutputDimension')
wkb_writer_get_include_srid = WKBWriterGet('GEOSWKBWriter_getIncludeSRID', restype=c_byte)
wkb_writer_set_include_srid = WKBWriterSet('GEOSWKBWriter_setIncludeSRID', argtypes=[WKB_WRITE_PTR, c_byte])
# ### Base I/O Class ###
class IOBase(GEOSBase):
"Base class for GEOS I/O objects."
def __init__(self):
# Getting the pointer with the constructor.
self.ptr = self._constructor()
# Loading the real destructor function at this point as doing it in
# __del__ is too late (import error).
self.destructor.func
# ### Base WKB/WKT Reading and Writing objects ###
# Non-public WKB/WKT reader classes for internal use because
# their `read` methods return _pointers_ instead of GEOSGeometry
# objects.
class _WKTReader(IOBase):
_constructor = wkt_reader_create
ptr_type = WKT_READ_PTR
destructor = wkt_reader_destroy
def read(self, wkt):
if not isinstance(wkt, (bytes, str)):
raise TypeError
return wkt_reader_read(self.ptr, force_bytes(wkt))
class _WKBReader(IOBase):
_constructor = wkb_reader_create
ptr_type = WKB_READ_PTR
destructor = wkb_reader_destroy
def read(self, wkb):
"Return a _pointer_ to C GEOS Geometry object from the given WKB."
if isinstance(wkb, memoryview):
wkb_s = bytes(wkb)
return wkb_reader_read(self.ptr, wkb_s, len(wkb_s))
elif isinstance(wkb, (bytes, str)):
return wkb_reader_read_hex(self.ptr, wkb, len(wkb))
else:
raise TypeError
# ### WKB/WKT Writer Classes ###
class WKTWriter(IOBase):
_constructor = wkt_writer_create
ptr_type = WKT_WRITE_PTR
destructor = wkt_writer_destroy
_trim = False
_precision = None
def __init__(self, dim=2, trim=False, precision=None):
super().__init__()
if bool(trim) != self._trim:
self.trim = trim
if precision is not None:
self.precision = precision
self.outdim = dim
def write(self, geom):
"Return the WKT representation of the given geometry."
return wkt_writer_write(self.ptr, geom.ptr)
@property
def outdim(self):
return wkt_writer_get_outdim(self.ptr)
@outdim.setter
def outdim(self, new_dim):
if new_dim not in (2, 3):
raise ValueError('WKT output dimension must be 2 or 3')
wkt_writer_set_outdim(self.ptr, new_dim)
@property
def trim(self):
return self._trim
@trim.setter
def trim(self, flag):
if bool(flag) != self._trim:
self._trim = bool(flag)
wkt_writer_set_trim(self.ptr, self._trim)
@property
def precision(self):
return self._precision
@precision.setter
def precision(self, precision):
if (not isinstance(precision, int) or precision < 0) and precision is not None:
raise AttributeError('WKT output rounding precision must be non-negative integer or None.')
if precision != self._precision:
self._precision = precision
wkt_writer_set_precision(self.ptr, -1 if precision is None else precision)
class WKBWriter(IOBase):
_constructor = wkb_writer_create
ptr_type = WKB_WRITE_PTR
destructor = wkb_writer_destroy
geos_version = geos_version_tuple()
def __init__(self, dim=2):
super().__init__()
self.outdim = dim
def _handle_empty_point(self, geom):
from django.contrib.gis.geos import Point
if isinstance(geom, Point) and geom.empty:
if self.srid:
# PostGIS uses POINT(NaN NaN) for WKB representation of empty
# points. Use it for EWKB as it's a PostGIS specific format.
# https://trac.osgeo.org/postgis/ticket/3181
geom = Point(float('NaN'), float('NaN'), srid=geom.srid)
else:
raise ValueError('Empty point is not representable in WKB.')
return geom
def write(self, geom):
"Return the WKB representation of the given geometry."
from django.contrib.gis.geos import Polygon
geom = self._handle_empty_point(geom)
wkb = wkb_writer_write(self.ptr, geom.ptr, byref(c_size_t()))
if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
# Fix GEOS output for empty polygon.
# See https://trac.osgeo.org/geos/ticket/680.
wkb = wkb[:-8] + b'\0' * 4
return memoryview(wkb)
def write_hex(self, geom):
"Return the HEXEWKB representation of the given geometry."
from django.contrib.gis.geos.polygon import Polygon
geom = self._handle_empty_point(geom)
wkb = wkb_writer_write_hex(self.ptr, geom.ptr, byref(c_size_t()))
if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
wkb = wkb[:-16] + b'0' * 8
return wkb
# ### WKBWriter Properties ###
# Property for getting/setting the byteorder.
def _get_byteorder(self):
return wkb_writer_get_byteorder(self.ptr)
def _set_byteorder(self, order):
if order not in (0, 1):
raise ValueError('Byte order parameter must be 0 (Big Endian) or 1 (Little Endian).')
wkb_writer_set_byteorder(self.ptr, order)
byteorder = property(_get_byteorder, _set_byteorder)
# Property for getting/setting the output dimension.
@property
def outdim(self):
return wkb_writer_get_outdim(self.ptr)
@outdim.setter
def outdim(self, new_dim):
if new_dim not in (2, 3):
raise ValueError('WKB output dimension must be 2 or 3')
wkb_writer_set_outdim(self.ptr, new_dim)
# Property for getting/setting the include srid flag.
@property
def srid(self):
return bool(wkb_writer_get_include_srid(self.ptr))
@srid.setter
def srid(self, include):
wkb_writer_set_include_srid(self.ptr, bool(include))
# `ThreadLocalIO` object holds instances of the WKT and WKB reader/writer
# objects that are local to the thread. The `GEOSGeometry` internals
# access these instances by calling the module-level functions, defined
# below.
class ThreadLocalIO(threading.local):
wkt_r = None
wkt_w = None
wkb_r = None
wkb_w = None
ewkb_w = None
thread_context = ThreadLocalIO()
# These module-level routines return the I/O object that is local to the
# thread. If the I/O object does not exist yet it will be initialized.
def wkt_r():
thread_context.wkt_r = thread_context.wkt_r or _WKTReader()
return thread_context.wkt_r
def wkt_w(dim=2, trim=False, precision=None):
if not thread_context.wkt_w:
thread_context.wkt_w = WKTWriter(dim=dim, trim=trim, precision=precision)
else:
thread_context.wkt_w.outdim = dim
thread_context.wkt_w.trim = trim
thread_context.wkt_w.precision = precision
return thread_context.wkt_w
def wkb_r():
thread_context.wkb_r = thread_context.wkb_r or _WKBReader()
return thread_context.wkb_r
def wkb_w(dim=2):
if not thread_context.wkb_w:
thread_context.wkb_w = WKBWriter(dim=dim)
else:
thread_context.wkb_w.outdim = dim
return thread_context.wkb_w
def ewkb_w(dim=2):
if not thread_context.ewkb_w:
thread_context.ewkb_w = WKBWriter(dim=dim)
thread_context.ewkb_w.srid = True
else:
thread_context.ewkb_w.outdim = dim
return thread_context.ewkb_w
|