Source code for eumap.datasets.lucas.analyze

'''
Analyze downloaded LUCAS samples.
'''

import json
import sqlite3
from osgeo import ogr
import os
import csv

from .exceptions import LucasDataError, LucasLoadError, LucasConfigError


[docs]class LucasClassAggregate: """Perform LC class aggregation. :param str gpkg_path: path to GPKG file created by :class:`.io.LucasIO.to_gpkg()`. """ def __init__(self, gpkg_path, mappings=None, mappings_file=None): if mappings is not None and mappings_file is not None: raise LucasConfigError('Only one of the parameters "data" and ' '"json_path" should be defined') if mappings is None and mappings_file is None: raise LucasConfigError('One of the parameters "data" and ' '"json_path" should be defined') self._gpkg_path = gpkg_path self.mappings = mappings self.mappings_file = mappings_file def _load_classes(self, classes): """Load aggregation rules from JSON file. :param dict classes: defined aggregation rules :return dictionary: dictionary of original classes and names of aggregated classes """ csv_lc1 = os.path.join(os.path.dirname(__file__), "lc1_codes.csv") with open(csv_lc1, newline='') as csv_f: layer_reader = csv.DictReader(csv_f, delimiter=";") # collect possible lc1 codes possible_codes = [] for row in layer_reader: possible_codes.append(row["code"]) try: values = list(classes.values()) values_list = [] for i in values: for j in i: values_list.append(j) if j not in possible_codes: raise LucasDataError(f"Code {j} is not Land Cover code!") if len(values_list) != len(set(values_list)): raise LucasDataError("Some code is used repeatedly!") except ValueError as e: raise LucasDataError(f"Invalid json file: {e}") return classes
[docs] def apply(self): """Apply aggregation rules on GPKG file :param dict data: defined aggregation rules """ if self.mappings is not None: self._apply_from_data() else: self._apply_from_file()
def _apply_from_file(self): """Apply aggregation rules defined in JSON file on GPKG file :param str json_path: path to JSON file with defined aggregation rules """ try: with open(self.mappings_file) as json_file: self.mappings = json.load(json_file) self._apply_from_data() except FileNotFoundError as e: raise LucasLoadError(f"Invalid json file path: {e}") def _apply_from_data(self): """Apply aggregation rules on GPKG file :param dict data: defined aggregation rules """ driver = ogr.GetDriverByName("GPKG") if os.path.exists(self._gpkg_path): gpkg = driver.Open(self._gpkg_path) layer = gpkg.GetLayer() layer_name = layer.GetName() if layer_name[6:8] == "st": columns_h = [] columns_a = [] layer_definition = layer.GetLayerDefn() for i in range(layer_definition.GetFieldCount()): attr = layer_definition.GetFieldDefn(i).GetName() if attr in ["lc1_h_2006", "lc1_h_2009", "lc1_h_2012", "lc1_h_2015", "lc1_h_2018"]: columns_h.append(attr) columns_a.append(attr.replace("h", "a")) if not columns_h: raise LucasDataError(f"There is no lc1_h column in gpkg file!") else: columns_h = ["lc1_h"] columns_a = ["lc1_a"] classes = self._load_classes(self.mappings) with sqlite3.connect(self._gpkg_path) as con: con.enable_load_extension(True) cur = con.cursor() cur.execute('SELECT load_extension("mod_spatialite");') for h_column, new_column in zip(columns_h, columns_a): try: cur.execute( f"ALTER TABLE {layer_name} ADD COLUMN {new_column} " f"TEXT") except sqlite3.OperationalError: sql_query = f"UPDATE {layer_name} SET {new_column} = " \ f"null" cur.execute(sql_query) print(f"Column {new_column} already exists in the table " f"{layer_name} - it will be rewritten with the new " f"data") try: cur.execute(f"CREATE INDEX IF NOT EXISTS {h_column}_idx ON {layer_name}({h_column})") for key in classes: q_marks = "?" * len(classes[key]) sql_query = f"UPDATE {layer_name} SET {new_column} =? WHERE {h_column} IN ({','.join(q_marks)})" val = tuple([key] + classes[key]) cur.execute(sql_query, val) except sqlite3.OperationalError as e: raise LucasDataError(f"Not possible to aggregate: {e}") con.close() else: raise LucasLoadError("GPKG file doesn't exist")
[docs]class LucasClassTranslate: """Perform LC class translation. :param str gpkg_path: path to GPKG file created by :class:`.io.LucasIO.to_gpkg()`. """ def __init__(self, gpkg_path, csvpath=None): if csvpath is None: csvpath = os.path.join(os.path.split(__file__)[0], 'LUCAS_unique_lc1_lu1_combinations_ALL.csv') self.gpkg_path = gpkg_path self.csv_path = csvpath self.source_col = 'CLC3' if not os.path.exists(self.gpkg_path): raise LucasLoadError("GPKG file doesn't exist") if not os.path.exists(self.csv_path): raise LucasLoadError("CSV file doesn't exist")
[docs] def get_translations(self): """Get supported translations. :return list: list of translation tables """ with open(self.csv_path) as trans_data: return trans_data.readline().split(',')
[docs] def set_translations(self, source_col): """Get supported translations. :return list: list of translation tables """ self.source_col = source_col
[docs] def apply(self): """Apply translation rules on GPKG file. """ driver = ogr.GetDriverByName("GPKG") gpkg = driver.Open(self.gpkg_path) layer = gpkg.GetLayer() layer_name = layer.GetName() with open(self.csv_path) as trans_data: header = trans_data.readline() header_list = header.split(',') new_col = 'clc3' lc1_col = 'lc1' lu1_col = 'lu1' if self.source_col not in header_list: raise LucasConfigError( f'Column {self.source_col} not found in the header. ' f'Only columns {header} found') new_col_ind = header_list.index(self.source_col) lc1_ind = header_list.index('LC1') lu1_ind = header_list.index('LU1') repre_ind = header_list.index('Representativeness') new_col = self.source_col.lower() with sqlite3.connect(self.gpkg_path) as con: con.enable_load_extension(True) cur = con.cursor() cur.execute('SELECT load_extension("mod_spatialite");') try: cur.execute( f"ALTER TABLE {layer_name} ADD COLUMN {new_col} TEXT") except sqlite3.OperationalError: sql_query = f"UPDATE {layer_name} SET {new_col} = null" cur.execute(sql_query) print(f"Column {new_col} already exists in the table " f"{layer_name} - it will be rewritten with the new " f"data") sql_query = f"UPDATE {layer_name} SET {new_col} = ? WHERE " \ f"{lc1_col} = ? AND {lu1_col} = ?" for line in trans_data.readlines(): line_split = line.split(',') if line_split[repre_ind] == '1': cur.execute(sql_query, (line_split[new_col_ind], line_split[lc1_ind], line_split[lu1_ind]))