Source code for gws.gis.gdal2

"""GDAL wrappers"""

import datetime
import contextlib

import osgeo.gdal
import osgeo.ogr

import gws
import gws.gis.feature
import gws.gis.shape

import gws.types as t


[docs]@contextlib.contextmanager def from_string(s, **opts): fname = '/vsimem/' + gws.random_string(64) osgeo.gdal.FileFromMemBuffer(fname, s) ds = osgeo.gdal.OpenEx(fname, **opts) yield ds del ds osgeo.gdal.Unlink(fname)
[docs]@contextlib.contextmanager def from_path(path, **opts): ds = osgeo.gdal.OpenEx(path, **opts) yield ds del ds
[docs]def features(ds, crs=None, encoding=None): fs = [] for n in range(ds.GetLayerCount()): layer = ds.GetLayer(n) layer_name = layer.GetName() for feature in layer: atts = {} for k in range(feature.GetFieldCount()): fdef = feature.GetFieldDefnRef(k) name = fdef.GetName() type_val = _value(feature, k, fdef, encoding) if type_val: atts[name] = t.Attribute(name=name, type=type_val[0], value=type_val[1]) uid = feature.GetFID() if not uid and 'fid' in atts: uid = atts.pop('fid').value fs.append(gws.gis.feature.Feature( uid=uid, category=layer_name, shape=_shape(feature.GetGeomFieldRef(0), crs), attributes=list(atts.values()) )) return fs
def _value(feature, k, fdef, encoding): ft = fdef.type if ft == osgeo.ogr.OFTString: if encoding: return t.AttributeType.str, feature.GetFieldAsBinary(k).decode(encoding) else: return t.AttributeType.str, feature.GetFieldAsString(k) if ft in (osgeo.ogr.OFTDate, osgeo.ogr.OFTTime, osgeo.ogr.OFTDateTime): # python GetFieldAsDateTime appears to use float seconds, as in # GetFieldAsDateTime (int i, int *pnYear, int *pnMonth, int *pnDay, int *pnHour, int *pnMinute, float *pfSecond, int *pnTZFlag) # v = feature.GetFieldAsDateTime(k) try: v = datetime.datetime(v[0], v[1], v[2], v[3], v[4], int(v[5])) return t.AttributeType.datetime, v except ValueError: return if ft == osgeo.ogr.OFSTBoolean: return t.AttributeType.bool, feature.GetFieldAsInteger(k) > 0 if ft in (osgeo.ogr.OFTInteger, osgeo.ogr.OFTInteger64): return t.AttributeType.int, feature.GetFieldAsInteger(k) if ft in (osgeo.ogr.OFTIntegerList, osgeo.ogr.OFTInteger64List): return t.AttributeType.list, feature.GetFieldAsIntegerList(k) if ft in (osgeo.ogr.OFTReal, osgeo.ogr.OFSTFloat32): return t.AttributeType.float, feature.GetFieldAsDouble(k) if ft == osgeo.ogr.OFTRealList: return t.AttributeType.list, feature.GetFieldAsDoubleList(k) if ft == osgeo.ogr.OFTBinary: return t.AttributeType.bytes, feature.GetFieldAsBinary(k) # @TODO # osgeo.ogr.OFTStringList # osgeo.ogr.OFTWideString # osgeo.ogr.OFTWideStringList def _shape(geom, crs): if not geom: return if not crs: sr = geom.GetSpatialReference() if not sr: return crs = _crs(sr) if not crs: return return gws.gis.shape.from_wkt(geom.ExportToWkt(), crs) def _crs(sr): a = sr.GetAuthorityName(None) c = sr.GetAuthorityCode(None) if a and c: return a + ':' + c # @TODO deal with non-EPSG codes