Source code for gws.ext.helper.alkis.cli

import time

from argh import arg

import gws
import gws.config
import gws.config.loader
import gws.tools.clihelpers as clihelpers
import gws.tools.json2

import gws.ext.helper.alkis as alkis

import gws.types as t

from .util import nas

COMMAND = 'alkis'


[docs]@arg('--path', help='path to the NAS zip archive') def parse(path=None): """Preprocess the NAS data model files""" # 'NAS_6.0.zip' can be downloaded from # see http://www.adv-online.de/AAA-Modell/Dokumente-der-GeoInfoDok/GeoInfoDok-6.0/ # under "Das externe Modell, Datenaustausch" props = nas.parse_properties(path) print(gws.tools.json2.to_string(props, pretty=True))
[docs]def create_index(): """Create an internal ALKIS search index.""" a = _get_alkis() if a: user, password = clihelpers.database_credentials() ts = time.time() a.create_index(user, password) t = time.time() - ts gws.log.info('index done in %.2f sec' % t)
[docs]def check_index(): """Check the status of the ALKIS search index.""" a = _get_alkis() if a: if a.index_ok(): gws.log.info(f'ALKIS indexes are ok') else: gws.log.info(f'ALKIS indexes are NOT ok')
[docs]def drop_index(): """Remove the ALKIS search index""" a = _get_alkis() if a: user, password = clihelpers.database_credentials() a.drop_index(user, password)
def _get_alkis() -> t.Optional[alkis.Object]: root = gws.config.loader.load() a = t.cast(alkis.Object, root.find_first('gws.ext.helper.alkis')) if not a: gws.log.error('ALKIS helper is not configured') return if not a.has_source: gws.log.error('ALKIS source data not found') return return a