import re
import shutil
import zipfile
import PIL.Image
import gws
import gws.ext.db.provider.postgres
import gws.gis.extent
import gws.gis.gdal2
import gws.gis.shape
import gws.qgis.project
import gws.tools.json2
import gws.tools.date
import gws.tools.os2 as os2
import gws.tools.job
import gws.types as t
[docs]class Stats(t.Data):
numRecords: int
numPngs: int
numPdfs: int
[docs]def run(action, src_path: str, replace: bool, au_uid: str = None, job: gws.tools.job.Job = None) -> Stats:
""""Import bplan data from a file or a directory."""
tmp_dir = None
if os2.is_file(src_path):
# a file is given - unpack it into a temp dir
tmp_dir = gws.ensure_dir(gws.TMP_DIR + '/bplan_' + gws.random_string(32))
_extract(src_path, tmp_dir)
stats = None
try:
stats = _run2(action, tmp_dir or src_path, replace, au_uid, job)
except gws.tools.job.PrematureTermination as e:
pass
if tmp_dir:
shutil.rmtree(tmp_dir)
return stats
[docs]def update(action):
with action.db.connect() as conn:
rs = conn.select(f'SELECT DISTINCT _au FROM {conn.quote_table(action.plan_table.name)}')
au_uids = set(r['_au'] for r in rs)
_update_pdfs(action, au_uids)
_create_qgis_projects(action, au_uids)
##
def _run2(action, src_dir, replace, au_uid, job):
gws.log.debug(f'BEGIN {src_dir!r} au={au_uid!r}')
stats = Stats(numRecords=0, numPngs=0, numPdfs=0)
_update_job(job, step=0, steps=5)
# iterate shape files and prepare a list of db records
shp_paths = set()
recs = {}
for p in sorted(os2.find_files(src_dir, ext='shp')):
# NB prefer '..._utf8.shp' variants if they exist
if 'utf8' in p:
shp_paths.discard(p.replace('utf8', ''))
shp_paths.add(p)
for p in sorted(shp_paths):
gws.log.debug(f'read {p!r}')
if au_uid and not _path_belongs_to_au(p, [au_uid]):
continue
with gws.gis.gdal2.from_path(p) as ds:
for f in gws.gis.gdal2.features(ds, action.crs, encoding=_encoding(p)):
r = {}
# convert all attributes to strings
for a in f.attributes:
if a.type == t.AttributeType.datetime:
val = gws.tools.date.to_iso_date(a.value)
else:
val = str(a.value)
r[a.name.lower()] = val
r['_uid'] = uid = r[action.key_col]
r['_au'] = r[action.au_key_col]
type_name = r.get(action.type_col, '')
for ty in action.type_list:
if ty.srcName == type_name:
r['_type'] = ty.uid
break
if uid not in recs:
recs[uid] = r
if not f.shape:
# if no geometry found, create a point from x/y coords
try:
f.shape = gws.gis.shape.from_geometry({
"type": "Point",
"coordinates": [
float(r[action.x_coord_col]),
float(r[action.y_coord_col]),
]
}, action.crs)
except:
pass
if f.shape:
s = f.shape.to_multi()
recs[uid][_geom_name(s)] = s.ewkt
_update_job(job, step=1)
# insert records
table: t.SqlTable = action.plan_table
db: gws.ext.db.provider.postgres.Object = action.db
recs = list(recs.values())
au_uids = [au_uid] if au_uid else sorted(set(r['_au'] for r in recs))
with db.connect() as conn:
src = table.name
with conn.transaction():
for a in au_uids:
au_recs = [r for r in recs if r['_au'] == a]
if not au_recs:
continue
gws.log.debug(f'insert {a!r} ({len(au_recs)})')
stats.numRecords += len(au_recs)
if replace:
conn.execute(f'DELETE FROM {conn.quote_table(src)} WHERE _au = %s', [a])
else:
uids = [r['_uid'] for r in au_recs]
ph = ','.join(['%s'] * len(uids))
conn.execute(f'DELETE FROM {conn.quote_table(src)} WHERE _uid IN ({ph})', uids)
conn.insert_many(src, au_recs)
_update_job(job, step=2)
# move png/pgw files into place
dd = action.data_dir
if replace:
for p in os2.find_files(f'{dd}/png'):
if _path_belongs_to_au(p, au_uids):
gws.log.debug(f'delete {p}')
os2.unlink(p)
for p in os2.find_files(src_dir, ext='png'):
if not _path_belongs_to_au(p, au_uids):
continue
w = re.sub(r'\.png$', '.pgw', p)
if not os2.is_file(w):
continue
fb = _fnbody(p)
gws.log.debug(f'copy {fb}.png')
shutil.copyfile(p, f'{dd}/png/{fb}.png')
shutil.copyfile(w, f'{dd}/png/{fb}.pgw')
stats.numPngs += 1
_update_job(job, step=3)
# move pdfs into place
if replace:
for p in os2.find_files(f'{dd}/pdf'):
if _path_belongs_to_au(p, au_uids):
gws.log.debug(f'delete {p}')
os2.unlink(p)
for p in os2.find_files(src_dir, ext='pdf'):
if not _path_belongs_to_au(p, au_uids):
continue
fb = _fnbody(p)
gws.log.debug(f'copy {fb}.pdf')
shutil.copyfile(p, f'{dd}/pdf/{fb}.pdf')
stats.numPdfs += 1
_update_job(job, step=4)
#
_update_pdfs(action, au_uids)
_update_job(job, step=5)
#
_create_qgis_projects(action, au_uids)
_update_job(job, state=gws.tools.job.State.complete)
gws.log.debug(f'END {src_dir!r}')
return stats
def _update_pdfs(action, au_uids):
gws.log.debug(f'update pdfs for {au_uids!r}')
dd = action.data_dir
by_uid = {}
with action.db.connect() as conn:
for r in conn.select(f'SELECT _au, _uid FROM {conn.quote_table(action.plan_table.name)}'):
if r['_au'] in au_uids:
by_uid[r['_uid']] = []
for p in os2.find_files(dd + '/pdf', ext='pdf'):
fn = _filename(p)
for uid, names in by_uid.items():
if fn.startswith(uid):
names.append(fn)
break
with action.db.connect() as conn:
with conn.transaction():
for uid, names in by_uid.items():
if names:
gws.log.debug(f'save pdfs for {uid}')
names = ','.join(names)
conn.execute(f'UPDATE {conn.quote_table(action.plan_table.name)} SET medien=%s WHERE _uid=%s', [names, uid])
def _create_qgis_projects(action, au_uids):
gws.log.debug(f'create qgis projects for {au_uids!r}')
dd = action.data_dir
extents = _enum_extents(action, au_uids)
layers = _enum_layers(action, au_uids)
for au_uid in au_uids:
path = f'{dd}/qgs/{au_uid}.qgs'
ls = [la for la in layers if la['au_uid'] == au_uid]
if not ls:
os2.unlink(path)
return
ext = extents.get(au_uid)
if not ext:
continue
res = action.qgis_template.render({
'extent': ext,
'layers': ls
})
gws.write_file(path, res.content)
gws.log.debug(f'created {path!r}')
def _enum_extents(action, au_uids):
extents = {}
with action.db.connect() as conn:
tab = conn.quote_table(action.plan_table.name)
rs = conn.select(f'SELECT _au, ST_Extent(_geom_p) AS p FROM {tab} GROUP BY _au')
for rec in rs:
if rec['_au'] not in au_uids:
continue
extents[rec['_au']] = gws.gis.extent.from_box(rec['p'])
return extents
def _enum_layers(action, au_uids):
layers = {}
images = _enum_images(action)
au_index = {au.uid: au for au in action.au_list}
type_index = {ty.uid: ty for ty in action.type_list}
def _layer_uid(rec, geom_type):
return rec['_type'].lower() + '_' + geom_type + '_' + rec['_au']
def _new_layer(rec, geom_type):
au = au_index.get(rec['_au'])
au_name = au.name if au else ''
ty = type_index.get(rec['_type'])
type_name = ty.name if ty else ''
color = ty.color if ty else ''
return {
'uid': _layer_uid(rec, geom_type),
'geom': geom_type,
'type': rec['_type'],
'type_name': type_name,
'au_uid': rec['_au'],
'au_name': au_name,
'color': color,
'images': [],
}
with action.db.connect() as conn:
tab = conn.quote_table(action.plan_table.name)
rs = conn.select(f'SELECT _uid, _au, _type, _geom_p, _geom_l, _geom_x FROM {tab} ORDER BY _uid')
for rec in rs:
if rec['_au'] not in au_uids:
continue
for g in 'plx':
if rec['_geom_' + g]:
layer_uid = _layer_uid(rec, g)
if layer_uid not in layers:
layers[layer_uid] = _new_layer(rec, g)
imgs = [img for img in images if img['fname'].startswith(rec['_uid'])]
if imgs:
layer_uid = _layer_uid(rec, 'r')
if layer_uid not in layers:
layers[layer_uid] = _new_layer(rec, 'r')
layers[layer_uid]['images'].extend(imgs)
ls = sorted(layers.values(), key=lambda la: la['type_name'])
for la in ls:
la['images'].sort(key=lambda img: img['fname'], reverse=True)
return ls
def _enum_images(action):
dd = action.data_dir
images = []
for path in os2.find_files(f'{dd}/png', ext='png'):
fn = _fnbody(path)
converted_path = f'{dd}/cnv/{fn}.png'
if os2.file_mtime(converted_path) < os2.file_mtime(path):
try:
# reduce the image palette (20-30 colors work just fine for scanned plans)
gws.log.debug(f'converting {path!r}')
img = PIL.Image.open(path)
img = img.convert('RGBA')
img = img.convert('P', palette=PIL.Image.ADAPTIVE, colors=action.image_quality)
img.save(converted_path)
# copy the pgw along
pgw = gws.read_file(f'{dd}/png/{fn}.pgw')
gws.write_file(f'{dd}/cnv/{fn}.pgw', pgw)
except Exception as e:
gws.log.error(f'error converting {path!r}: {e}')
continue
images.append({
'uid': '_r_' + fn,
'fname': fn,
'path': converted_path,
'palette': _image_palette(converted_path)
})
return images
def _image_palette(path):
colors = []
img = PIL.Image.open(path)
palette = img.getpalette()
# transparency is either a 256-bytes array for each entry or an integer index
transparency = img.info.get('transparency', None)
if isinstance(transparency, bytes) and len(transparency) < 256:
transparency = None
for n in range(255):
r = palette[n * 3 + 0]
g = palette[n * 3 + 1]
b = palette[n * 3 + 2]
if isinstance(transparency, int):
alpha = 0 if n == transparency else 0xFF
elif isinstance(transparency, bytes):
alpha = transparency[n]
else:
alpha = 0xFF
colors.append(['#%02x%02x%02x' % (r, g, b), alpha])
return colors
def _extract(zip_path, target_dir):
zf = zipfile.ZipFile(zip_path)
for fi in zf.infolist():
fn = _filename(fi.filename)
if not fn or fn.startswith('.'):
continue
with zf.open(fi) as src, open(target_dir + '/' + fn, 'wb') as dst:
gws.log.debug(f'unzip {fn!r}')
shutil.copyfileobj(src, dst)
def _encoding(path):
if os2.is_file(path.replace('.shp', '.cpg')):
# have a cpg file, let gdal handle the encoding
return
return 'utf8' if 'utf8' in path else 'ISO-8859–1'
def _geom_name(s: t.IShape):
if s.type == t.GeometryType.multipoint:
return '_geom_x'
if s.type == t.GeometryType.multilinestring:
return '_geom_l'
if s.type == t.GeometryType.multipolygon:
return '_geom_p'
raise ValueError(f'invalid geometry type: {s.type!r}')
def _filename(path):
return os2.parse_path(path)['filename']
def _fnbody(path):
return os2.parse_path(path)['name']
def _path_belongs_to_au(path, au_uids):
fn = _filename(path)
# filename is like AAAAnnn.png or Shapes_AAAA_xxx.shp, where AAAA = au uid
for a in au_uids:
if fn.startswith(a) or (a + '_' in fn) or ('_' + a in fn):
return True
return False
def _diff(a, b):
d = {}
for k in a.keys() | b.keys():
if a.get(k) != b.get(k):
d[k] = a[k], b[k]
return d
def _update_job(job, **kwargs):
if not job:
return
j = gws.tools.job.get(job.uid)
if not j:
raise gws.tools.job.PrematureTermination('NOT_FOUND')
if j.state != gws.tools.job.State.running:
raise gws.tools.job.PrematureTermination(f'WRONG_STATE={j.state}')
j.update(**kwargs)