'''
Overlay and spatial prediction fully compatible with ``scikit-learn``.
'''
from typing import List, Union, Callable
from abc import ABC, abstractmethod
from enum import Enum
from concurrent.futures import as_completed, ThreadPoolExecutor
import concurrent.futures
import traceback
import joblib
import math
import time
import uuid
import gc
import re
import os
from sklearn.model_selection import cross_val_predict, GridSearchCV, KFold, BaseCrossValidator
from sklearn.utils.validation import has_fit_parameter
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.impute import SimpleImputer
from sklearn.base import BaseEstimator, is_classifier, is_regressor
from sklearn.pipeline import Pipeline
from sklearn import preprocessing
from sklearn import metrics
from rasterio.windows import Window
from pandas import DataFrame
from pathlib import Path
import geopandas as gpd
import pandas as pd
import numpy as np
import rasterio
from . import parallel
from .misc import ttprint, find_files
from .raster import read_rasters, write_new_raster
import warnings
# warnings.filterwarnings('ignore') # should not ignore all warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['AUTOGRAPH_VERBOSITY'] = '0'
_automl_enabled = False
try:
from autosklearn.classification import AutoSklearnClassifier
_automl_enabled = True
except ImportError:
pass
DEFAULT = {
'META_ESTIMATOR': LogisticRegression(),
'ESTIMATOR': RandomForestClassifier(),
'CV': KFold(5)
}
[docs]def build_ann(
input_shape,
output_shape,
n_layers = 3,
n_neurons = 32,
activation = 'relu',
dropout_rate = 0.0,
learning_rate = 0.0001,
output_activation = 'softmax',
loss = 'categorical_crossentropy'
):
"""
Helper function to create a pretty standard Artificial Neural
Network-ANN using ``tensorflow``. It's based in a ``Sequential``
model, which connects multiple hidden layers
(``Dense=>Dropout=>BatchNormalization``) and uses a ``Nadam``
optimizer. Developed to be used together with ``KerasClassifier``.
:param input_shape: The input data shape.
:param output_shape: The output data shape.
:param n_layers: Number of hidden layers.
:param n_neurons: Number of neurons for the hidden layers.
:param activation: Activation function for the input and hidden layers.
:param dropout_rate: Dropout rate for the ``BatchNormalization``.
:param learning_rate: Learning rate for the optimized.
:param output_activation: Activation function for the output layer.
:param loss: Loss function used for the Optimizer.
:returns: The ANN model
:rtype: Sequential
Examples
========
>>> from eumap.mapper import build_ann
>>> from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
>>>
>>> ann = KerasClassifier(build_ann, input_shape=(-1, 180), output_shape=33,
>>> epochs=3, batch_size=64, shuffle=True, verbose=1)
"""
try:
import tensorflow as tf
tf.autograph.set_verbosity(0)
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
from tensorflow.keras.layers import Dense, BatchNormalization, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Nadam
except ImportError as e:
warnings.warn('build_ann requires tensorflow>=2.5.0')
model = Sequential()
model.add(Dense(input_shape, activation=activation))
for i in range(0, n_layers):
model.add(Dense(n_neurons, activation=activation))
model.add(Dropout(dropout_rate))
model.add(BatchNormalization())
model.add(Dense(output_shape, activation=output_activation))
model.compile(loss=loss,
optimizer=Nadam(learning_rate=learning_rate),
)
return model
[docs]class PredictionStrategyType(Enum):
"""
Strategy to read multiple raster files during the prediction
"""
Lazy = 1 #: Load one year while predict other.
Eager = 2 #: First load all years, then predict all.
[docs]class LandMapper():
"""
Spatial prediction implementation based in supervised machine
learning models and point samples.
It's fully compatible with ``scikit-learn`` [1] supporting:
1. Classification models,
2. Seamless training using point samples,
3. Data imputation,
4. Hyper-parameter optimization,
5. Feature selection,
6. Ensemble machine learning (EML) and prediction uncertainty,
7. AutoML through ``auto-sklearn`` [2],
8. Accuracy assessment through cross-validation,
9. Seamless raster prediction (read and write).
:param points: Point samples used to train the ML model. It supports ``pandas.DataFrame`` and
a path for plain CSV ``(*.csv)`` or compressed csv file ``(*.gz)``, which are read
through ``pandas.read_csv`` [3]. All the other extensions are read by ``geopandas`` as GIS vector files [4].
:param target_col: Column name used to retrieve the target values for the training.
:param feat_cols: List of column names used to retrieve the feature/covariates for the training.
:param feat_col_prfxs: List of column prefixes used to derive the ``feat_cols`` list, avoiding to provide
dozens/hundreds of column names.
:param weight_col: Column name used to retrieve the ``sample_weight`` for the training.
:param nodata_imputer: Transformer used to input missing values filling all ``np.nan`` in the point
samples. All ``sklearn.impute`` classes are supported [1].
:param estimator: The ML model used by the class. The default model is a ``RandomForestClassifier``,
however all the ``sklearn`` model are supported [1]. For ``estimator=None`` it tries to use ``auto-sklearn``
to find the best model and hyper-parameters [2].
:param estimator_list: A list of models used by the EML implementation. The models output are used to
feed the ``meta_estimator`` model and to derive the prediction uncertainty. This argument has
prevalence over ``estimator``.
:param meta_estimator: Model used to derive the prediction output in the EML implementation. The default model
here is a ``LogisticRegression``, however all the ``sklearn`` model are supported [1].
:param hyperpar_selection: Hyper-parameter optimizer used by ``estimator`` model.
:param hyperpar_selection_list: A list of hyper-parameter optimizers used by ``estimator_list`` models, provided
in the same order. This argument has prevalence over ``hyperpar_selection``.
:param hyperpar_selection_meta: Hyper-parameter optimizer used by ``meta_estimator`` model.
:param feature_selection: Feature selection algorithm used by ``estimator`` model.
:param feature_selections_list: A list of feature selection algorithm used by ``estimator_list`` models, provided
in the same order. This argument has prevalence over ``feature_selection``.
:param cv: Cross validation strategy used by all models. The default strategy is a ``5-Fold cv``,
however all the ``sklearn`` model are supported [1].
:param cv_njobs: Number of CPU cores to be used in parallel during the cross validation.
:param cv_group_col: Column name used to split the train/test set during the cross validation. Use this argument
to perform a ``spatial CV`` by block/tiles.
:param min_samples_per_class: Minimum percentage of samples according to ``target_col`` to keep the class in the
training.
:param pred_method: Use ``predict_prob`` to predict probabilities and uncertainty, otherwise it predicts only
the dominant class.
:param apply_corr_factor: Apply a correction factor (``rmse / averaged_sd``) in the prediction uncertainty output.
:param verbose:bool: Use ``True`` to print the progress of all steps.
:param \*\*autosklearn_kwargs: Named arguments supported by ``auto-sklearn`` [2].
For **usage examples** access the ``eumap`` tutorials [5,6].
References
==========
[1] `Sklearn API Reference <https://scikit-learn.org/stable/modules/classes.html>`_
[2] `Auto-sklearn API <https://automl.github.io/auto-sklearn/master/api.html>`_
[3] `Pandas read_csv function <https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html>`_
[4] `Geopandas read_file function <https://geopandas.org/docs/reference/api/geopandas.read_file.html>`_
[5] `Land Cover Mapping <../notebooks/03_landcover_mapping.html>`_
[6] `Land Cover Mapping (Advanced) <../notebooks/04_landcover_mapping_advanced.html>`_
"""
def __init__(
self,
points:Union[DataFrame, Path],
target_col:str,
feat_cols:Union[List, None] = [],
feat_col_prfxs:Union[List, None] = [],
weight_col:Union[str, None] = None,
nodata_imputer:Union[BaseEstimator, None] = None,
estimator:Union[BaseEstimator, None] = DEFAULT['ESTIMATOR'],
estimator_list:Union[List[BaseEstimator], None] = None,
meta_estimator:BaseEstimator = DEFAULT['META_ESTIMATOR'],
hyperpar_selection:Union[BaseEstimator, None] = None,
hyperpar_selection_list:Union[BaseEstimator, None] = None,
hyperpar_selection_meta:Union[List[BaseEstimator], None] = None,
feature_selection:Union[BaseEstimator, None] = None,
feature_selections_list:Union[BaseEstimator, None] = None,
cv:BaseCrossValidator = DEFAULT['CV'],
cv_njobs:int = 1,
cv_group_col:str = None,
min_samples_per_class:float = 0,
pred_method:str = 'predict',
verbose:bool = True,
apply_corr_factor:bool = False,
**autosklearn_kwargs
):
self.verbose = verbose
self.pts = self._pts(points)
self.target_col = target_col
self.feat_col_prfxs = feat_col_prfxs
self.feature_cols = self._feature_cols(feat_cols, feat_col_prfxs)
self.nodata_imputer = nodata_imputer
self.is_automl_estimator = (_automl_enabled and estimator is None)
if self.is_automl_estimator:
self.estimator_list = [ AutoSklearnClassifier(**autosklearn_kwargs) ]
else:
self.estimator_list = self._set_list(estimator, estimator_list, 'estimator')
self.hyperpar_selection_list = self._set_list(hyperpar_selection, hyperpar_selection_list)
self.feature_selections_list = self._set_list(feature_selection, feature_selections_list)
self.meta_estimator, self.meta_features = self._meta_estimator(meta_estimator)
self.hyperpar_selection_meta = hyperpar_selection_meta
self.cv = cv
self.cv_njobs = cv_njobs
self.pred_method = self._pred_method(pred_method)
self._min_samples_restriction(min_samples_per_class)
self.features = np.ascontiguousarray(self.pts[self.feature_cols].to_numpy(), dtype=np.float32)
self.target = target = np.ascontiguousarray(self.pts[self.target_col].to_numpy(), dtype=np.float32)
self._target_transformation()
self.samples_weight = self._get_column_if_exists(weight_col, 'weight_col')
self.cv_groups = self._get_column_if_exists(cv_group_col, 'cv_group_col')
self.corr_factor = 1
self.apply_corr_factor = apply_corr_factor
if self.nodata_imputer is not None:
self.features = self._impute_nodata(self.features, fit_and_tranform = True)
def _pts(self, points):
if isinstance(points, Path):
suffix = points.suffix
if suffix == '.csv':
return pd.read_csv(points)
elif suffix == '.gz':
return pd.read_csv(points, compression='gzip')
else:
return gpd.read_file(points)
elif isinstance(points, DataFrame):
return points
else:
return points
def _feature_cols(self, feat_cols, feat_col_prfxs):
feature_cols = []
if len(feat_cols) > 0:
feature_cols = list(self.pts.columns[self.pts.columns.isin(feat_cols)])
elif len(feat_col_prfxs) > 0:
for feat_prfx in feat_col_prfxs:
feature_cols += list(self.pts.columns[self.pts.columns.str.startswith(feat_prfx)])
else:
raise Exception(f'You should provide at least one of these: feat_cols or feat_col_prfxs.')
if len(feature_cols) == 0:
raise Exception(f'None feature was found. Check the provided feat_cols or feat_col_prfxs.')
return feature_cols
def _get_column_if_exists(self, column_name, param_name):
if column_name is not None:
if column_name in self.pts.columns:
return self.pts[column_name]
else:
self._verbose(f'Ignoring {param_name}, because {column_name} column not exists.')
else:
return None
# features_weight
def _set_list(self, obj, obj_list, var_name = None):
empty_obj = (obj is None)
empty_list = (obj_list is None or len(obj_list) == 0)
if not empty_list:
return obj_list
elif not empty_obj and empty_list:
return [obj]
elif var_name is not None:
raise Exception(f'You should provide at least one of these: {var_name} or {var_name}_list.')
else:
return []
def _meta_estimator(self, meta_estimator):
if len(self.estimator_list) > 1:
return meta_estimator, []
else:
return None, None
def _pred_method(self, pred_method):
if self.meta_estimator is not None and is_classifier(self.meta_estimator):
return 'predict_proba'
else:
return pred_method
def _min_samples_restriction(self, min_samples_per_class):
classes_pct = (self.pts[self.target_col].value_counts() / self.pts[self.target_col].count())
rows_to_keep = self.pts[self.target_col].isin(classes_pct[classes_pct >= min_samples_per_class].axes[0])
nrows, _ = self.pts[~rows_to_keep].shape
if nrows > 0:
removed_cls = self.pts[~rows_to_keep][self.target_col].unique()
self.pts = self.pts[rows_to_keep]
self._verbose(f'Removing {nrows} samples ({self.target_col} in {removed_cls}) '+
f'due min_samples_per_class condition (< {min_samples_per_class})')
def _target_transformation(self):
if self._is_classifier():
self._verbose(f"Transforming {self.target_col}:")
self.target_le = preprocessing.LabelEncoder()
# Change to starting from 1
self.target = self.target_le.fit_transform(self.target)
self.target_classes = {
'original': self.target_le.classes_,
'transformed': self.target_le.transform(self.target_le.classes_)
}
self._verbose(f" -Original classes: {self.target_classes['original']}")
self._verbose(f" -Transformed classes: {self.target_classes['transformed']}")
def _impute_nodata(self, data, fit_and_tranform = False):
nodata_idx = self._nodata_idx(data)
num_nodata_idx = np.sum(nodata_idx)
pct_nodata_idx = num_nodata_idx / data.size * 100
if (num_nodata_idx > 0):
self._verbose(f'Filling the missing values ({pct_nodata_idx:.2f}% / {num_nodata_idx} values)...')
if fit_and_tranform:
data = self.nodata_imputer.fit_transform(data)
else:
data = self.nodata_imputer.transform(data)
return data
def _nodata_idx(self, data):
if np.isnan(self.nodata_imputer.missing_values):
return np.isnan(data)
else:
return (data == self.nodata_imputer.missing_values)
def _verbose(self, *args, **kwargs):
if self.verbose:
ttprint(*args, **kwargs)
def _best_params(self, hyperpar_selection):
means = hyperpar_selection.cv_results_['mean_test_score']
stds = hyperpar_selection.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, hyperpar_selection.cv_results_['params']):
self._verbose(f" {mean:.5f} (+/-{2*std:.05f}) from {params}")
self._verbose(f'Best: {hyperpar_selection.best_score_:.5f} using {hyperpar_selection.best_params_}')
return hyperpar_selection.best_params_
def _class_optimal_th(self, curv_precision, curv_recall, curv_th):
# Removing elements where the precision or recall are zero
nonzero_mask = np.logical_and((curv_precision != 0.0), (curv_recall != 0.0))
optimal_idx = np.argmax(1 - np.abs(curv_precision[nonzero_mask] - curv_recall[nonzero_mask]))
return curv_recall[optimal_idx], curv_precision[optimal_idx], curv_th[optimal_idx]
def _classification_report_prob(self):
classes, cnt = np.unique(self.target, return_counts=True)
me = {
'log_loss': {},
'pr_auc': {},
'support': {},
'opti_th': {},
'opti_recall': {},
'opti_precision': {},
'curv_recall': {},
'curv_precision': {},
'curv_th': {},
}
for c in classes:
c_mask = (self.target == c)
me['log_loss'][c] = metrics.log_loss(self.target[c_mask], self.eval_pred[c_mask], labels=classes)
for c_idx, c in enumerate(classes):
me['support'][c] = cnt[c_idx]
c_targ = (self.target == c).astype(int)
c_pred = self.eval_pred[:, c_idx]
curv_precision, curv_recall, curv_th = metrics.precision_recall_curve(c_targ,c_pred)
me['curv_precision'][c], me['curv_recall'][c], me['curv_th'][c] = curv_precision, curv_recall, curv_th
me['pr_auc'][c] = metrics.auc(me['curv_recall'][c], me['curv_precision'][c])
me['opti_precision'][c], me['opti_recall'][c], me['opti_th'][c] = self._class_optimal_th(curv_precision, curv_recall, curv_th)
report = ' log_loss pr_auc optimal_prob optimal_precision optimal_recall support\n'
report += '\n'
for c in classes:
report += f"{int(c)} "
report += f"{me['log_loss'][c]:.4f} "
report += f"{me['pr_auc'][c]:.4f} "
report += f"{me['opti_th'][c]:.4f} "
report += f"{me['opti_precision'][c]:.4f} "
report += f"{me['opti_recall'][c]:.4f} "
report += f"{me['support'][c]}\n"
report += '\n'
report += f"Total "
report += f" {np.sum(cnt)}\n"
self.prob_metrics = me
return report
def _is_classifier(self):
if self.meta_estimator is not None:
return is_classifier(self.meta_estimator)
else:
return is_classifier(self.estimator_list[0])
def _calc_eval_metrics(self):
self.eval_metrics = {}
if self.pred_method == 'predict':
if self._is_classifier():
self.eval_metrics['confusion_matrix'] = metrics.confusion_matrix(self.target, self.eval_pred)
self.eval_metrics['overall_acc'] = metrics.accuracy_score(self.target, self.eval_pred)
self.eval_report = metrics.classification_report(self.target, self.eval_pred)
else:
self.eval_metrics['r2'] = metrics.r2_score(self.target, self.eval_pred)
self.eval_metrics['rmse'] = metrics.mean_squared_error(self.target, self.eval_pred, squared=False)
elif self.pred_method == 'predict_proba':
self.eval_metrics['log_loss'] = metrics.log_loss(self.target, self.eval_pred)
self.eval_report = self._classification_report_prob()
def _fit_params(self, estimator):
if isinstance(estimator, Pipeline):
return {'estimator__sample_weight': self.samples_weight}
if self.is_automl_estimator:
ttprint('LandMapper is using AutoSklearnClassifier, which not supports fit_params (ex: sample_weight)')
return {}
elif has_fit_parameter(estimator, "sample_weight"):
return {'sample_weight': self.samples_weight}
else:
return {}
def _is_catboost_model(self,estimator):
try:
from catboost import CatBoostClassifier, CatBoostRegressor
except ImportError as e:
return False
if isinstance(estimator,Pipeline):
return isinstance(estimator,Pipeline) and (
isinstance(estimator['estimator'], CatBoostClassifier)
or
isinstance(estimator['estimator'], CatBoostRegressor)
)
else:
return isinstance(estimator,CatBoostClassifier) or isinstance(estimator,CatBoostRegressor)
def _is_keras_model(self, estimator):
try:
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier, KerasRegressor
return isinstance(estimator, Pipeline) and (
isinstance(estimator['estimator'], KerasClassifier)
or
isinstance(estimator['estimator'], KerasRegressor)
)
except ImportError as e:
return False
def _binarizer_target_if_needed(self, estimator):
if self.pred_method == 'predict_proba' and self._is_keras_model(estimator):
le = preprocessing.LabelBinarizer()
target = le.fit_transform(self.target)
return target
else:
return self.target
def _do_hyperpar_selection(self, hyperpar_selection, estimator, features):
estimator_name = type(estimator).__name__
self._verbose(f'Optimizing hyperparameters for {estimator_name}')
cv_njobs = self.cv_njobs
if isinstance(self.cv, int):
cv_njobs = self.cv
elif isinstance(self.cv, BaseCrossValidator):
cv_njobs = self.cv.n_splits
hyperpar_selection.set_params(
cv = self.cv,
refit = False,
n_jobs = self.cv_njobs
)
hyperpar_selection.fit(features, self.target, groups=self.cv_groups, **self._fit_params(estimator))
estimator.set_params(**self._best_params(hyperpar_selection))
def _do_cv_prediction(self, estimator, features):
target = self.target
cv_njobs = self.cv_njobs
target = self._binarizer_target_if_needed(estimator)
if isinstance(self.cv, int):
cv_njobs = self.cv
elif isinstance(self.cv, BaseCrossValidator):
cv_njobs = self.cv.n_splits
return cross_val_predict(estimator, features, target, method=self.pred_method, n_jobs=self.cv_njobs, \
cv=self.cv, groups=self.cv_groups, verbose=self.verbose, fit_params = self._fit_params(estimator))
def _do_cv_evaluation(self):
self._verbose(f'Calculating evaluation metrics')
if self.meta_estimator is not None:
self.eval_pred = self._do_cv_prediction(self.meta_estimator, self.meta_features)
if self.apply_corr_factor:
self.corr_factor = self._correction_factor()
self._verbose(f'Correction factor equal to {self.corr_factor:.6f}')
else:
self.eval_pred = self._do_cv_prediction(self.estimator_list[0], self.features)
self._calc_eval_metrics()
def _calc_meta_features(self):
self._verbose(f'Calculating meta-features')
for estimator in self.estimator_list:
self.meta_features.append(self._do_cv_prediction(estimator, self.features))
if self._is_classifier():
self.meta_features = np.concatenate(self.meta_features, axis=1)
else:
self.meta_features = np.stack(self.meta_features, axis=1)
print(self.meta_features.shape)
self._verbose(f' Meta-features shape: {self.meta_features.shape}')
def _correction_factor(self, per_class = False):
n_estimators = len(self.estimator_list)
n_meta_features = self.meta_features.shape[1]
n_classes = int(n_meta_features / n_estimators)
meta_features = self.meta_features.reshape(-1, n_classes, n_estimators)
avg_std_axis, multioutput = None, 'uniform_average'
if per_class:
avg_std_axis, multioutput = (0, 'raw_values')
prep = preprocessing.LabelBinarizer()
target_bin = prep.fit_transform(self.target)
avg_std = np.mean(np.std(meta_features, axis=-1), axis=avg_std_axis)
rmse = metrics.mean_squared_error(target_bin, self.eval_pred, multioutput=multioutput, squared=False)
# See https://stats.stackexchange.com/questions/242787/how-to-interpret-root-mean-squared-error-rmse-vs-standard-deviation/375674
return (rmse / avg_std)
def _feature_idx(self, fn_layer):
return self.feature_cols.index(fn_layer.stem)
def _reorder_layers(self, layernames, dict_layers_newnames, input_data, raster_files):
sorted_input_data = []
sorted_raster_files = []
for feature_col in self.feature_cols:
try:
idx = layernames.index(feature_col)
sorted_input_data.append(input_data[:,:,idx])
if idx < len(raster_files):
sorted_raster_files.append(raster_files[idx])
except:
raise Exception(f"The feature {feature_col} was not provided")
return np.stack(sorted_input_data, axis=2), sorted_raster_files
def _read_layers(self, dirs_layers:List = [], fn_layers:List = [], spatial_win = None,
dtype = 'Float32', allow_additional_layers = False, inmem_calc_func = None, dict_layers_newnames={},
n_jobs_io = 5, verbose_renaming=True):
raster_data, raster_files = read_rasters(raster_dirs=dirs_layers, raster_files=fn_layers, \
spatial_win=spatial_win, dtype=dtype, n_jobs=n_jobs_io, \
verbose=self.verbose,)
feature_cols_set = set(self.feature_cols)
layernames = []
for raster_file in raster_files:
layername = raster_file.stem
for newname in dict_layers_newnames.keys():
layername_aux = layername
layername = re.sub(dict_layers_newnames[newname], newname, layername)
if layername_aux != layername and verbose_renaming:
self._verbose(f'Renaming {layername_aux} to {layername}')
if not allow_additional_layers and layername not in feature_cols_set:
raise Exception(f"Layer {layername} does not exist as feature_cols.\nUse dict_layers_newnames param to match their names")
layernames.append(layername)
if inmem_calc_func is not None:
layernames, raster_data = inmem_calc_func(layernames, raster_data, spatial_win)
return self._reorder_layers(layernames, dict_layers_newnames, raster_data, raster_files)
def _write_layer(self, fn_base_layer, fn_output, input_data_shape, output_data,
nan_mask, separate_probs = True, spatial_win = None, scale=1,
dtype = 'float32', new_suffix = None):
if len(output_data.shape) < 2:
n_classes = 1
else:
_, n_classes = output_data.shape
if not isinstance(output_data, np.floating):
output_data = output_data.astype('float32')
output_data[nan_mask] = np.nan
output_data = output_data.reshape(input_data_shape[0], input_data_shape[1], n_classes)
output_data = (output_data * scale).astype(dtype)
if new_suffix is not None:
out_ext = Path(fn_output).suffix
fn_output = Path(str(fn_output).replace(out_ext, new_suffix + out_ext))
fn_output_list = []
if separate_probs:
for i in range(0,n_classes):
out_ext = Path(fn_output).suffix
fn_output_c = Path(str(fn_output).replace(out_ext, f'_b{i+1}' + out_ext))
write_new_raster(fn_base_layer, fn_output_c, output_data[:,:,i:i+1],
spatial_win = spatial_win, dtype=dtype, nodata=255)
fn_output_list += [ fn_output_c ]
else:
write_new_raster(fn_base_layer, fn_output, output_data,
spatial_win = spatial_win, dtype=dtype, nodata=255)
fn_output_list += [ fn_output ]
return fn_output_list
def _write_layers(self, fn_base_layer, fn_output, input_data_shape, pred_result,
pred_uncer, nan_mask, spatial_win, separate_probs, hard_class, dtype = 'float32'):
fn_out_files = []
if self.pred_method != 'predict_proba':
separate_probs = False
scale = 1
else:
scale = 100
fn_pred_files = self._write_layer(fn_base_layer, fn_output, input_data_shape, pred_result, nan_mask,
separate_probs = separate_probs, spatial_win = spatial_win, dtype = dtype, scale = scale)
fn_out_files += fn_pred_files
if self.pred_method == 'predict_proba':
if pred_uncer is not None:
fn_uncer_files = self._write_layer(fn_base_layer, fn_output, input_data_shape, pred_uncer, nan_mask,
separate_probs = separate_probs, spatial_win = spatial_win, dtype = 'uint8', scale=100, new_suffix = '_uncertainty')
fn_out_files += fn_uncer_files
if hard_class:
pred_argmax = np.argmax(pred_result, axis=1)
pred_argmax_prob = np.take_along_axis(pred_result, np.stack([pred_argmax], axis=1), axis=1)
if pred_uncer is not None and pred_uncer.ndim > 2:
pred_argmax_uncer = np.take_along_axis(pred_uncer, np.stack([pred_argmax], axis=1), axis=1)
pred_argmax += 1
fn_hcl_file = self._write_layer(fn_base_layer, fn_output, input_data_shape, pred_argmax, nan_mask,
separate_probs = False, spatial_win = spatial_win, dtype = dtype, new_suffix = '_hcl')
fn_out_files += fn_hcl_file
fn_hcl_prob_files = self._write_layer(fn_base_layer, fn_output, input_data_shape, pred_argmax_prob, nan_mask,
separate_probs = False, spatial_win = spatial_win, dtype = 'uint8', scale=100, new_suffix = '_hcl_prob')
fn_out_files += fn_hcl_prob_files
if pred_uncer is not None and pred_uncer.ndim > 2:
fn_hcl_uncer_file = self._write_layer(fn_base_layer, fn_output, input_data_shape, pred_argmax_uncer, nan_mask,
separate_probs = False, spatial_win = spatial_win, dtype = 'uint8', scale=100, new_suffix = '_hcl_uncertainty')
fn_out_files += fn_hcl_uncer_file
return fn_out_files
[docs] def train(self):
"""
Train the ML/EML model according to the class arguments.
"""
# Hyperparameter optization for all estimators
for hyperpar_selection, estimator in zip(self.hyperpar_selection_list, self.estimator_list):
if hyperpar_selection is not None:
self._do_hyperpar_selection(hyperpar_selection, estimator, self.features)
# Meta-features calculation to feed the meta-estimator
if self.meta_estimator is not None:
self._calc_meta_features()
# Hyperparameter optization for the meta-estimator
if self.hyperpar_selection_meta is not None:
self._do_hyperpar_selection(self.hyperpar_selection_meta, self.meta_estimator, self.meta_features)
# CV calculation using the final estimator
self._do_cv_evaluation()
# Training the final estimators
for estimator in self.estimator_list:
estimator_name = type(estimator).__name__
self._verbose(f'Training {estimator_name} using all samples')
target = self._binarizer_target_if_needed(estimator)
estimator.fit(self.features, target, **self._fit_params(estimator))
# Training the final meta-estimator
if self.meta_estimator is not None:
self._verbose(f'Training meta-estimator using all samples')
self.meta_estimator.fit(self.meta_features, self.target, **self._fit_params(self.meta_estimator))
def _relative_entropy(self, pred_result):
_, n_classes = pred_result.shape
classes_proba = np.maximum(pred_result, 1e-15)
relative_entropy_pred = -1 * classes_proba * np.log2(classes_proba)
relative_entropy_pred = relative_entropy_pred.sum(axis=1) / np.log2(n_classes)
return relative_entropy_pred
def _predict(self, input_data):
start = time.time()
estimators_pred = []
for estimator in self.estimator_list:
estimator_name = type(estimator).__name__
self._verbose(f'Executing {estimator_name}')
if self._is_keras_model(estimator):
n_elements, _ = input_data.shape
pred_batch_size = int(n_elements/2)
self._verbose(f'###batch_size={pred_batch_size}')
estimator['estimator'].set_params(batch_size=pred_batch_size)
if self._is_catboost_model(estimator):
from catboost import Pool, FeaturesData
start_featuresdata = time.time()
input_data = FeaturesData(num_feature_data=input_data.astype(np.float32))
#ttprint(f"creating FeaturesData took {(time.time() - start_featuresdata):.2f} seconds")
start_pool = time.time()
input_data = Pool(data=input_data)
#ttprint(f"creating Pool from FeaturesData took {(time.time() - start_pool):.2f} seconds")
start_pred = time.time()
estimator_pred_method = getattr(estimator, self.pred_method)
if self._is_keras_model(estimator):
estimators_pred.append(estimator_pred_method(input_data, batch_size=pred_batch_size))
else:
estimators_pred.append(estimator_pred_method(input_data))
self._verbose(f'{estimator_name} prediction time: {(time.time() - start_pred):.2f} seconds')
self._verbose(f'Total time: {(time.time() - start):.2f} seconds')
if self.meta_estimator is None:
estimator_pred = estimators_pred[0]
relative_entropy_pred = None
#Produce almost the same data of the invese of the
#highest probability (hcl_prob.tif)
#if self.pred_method == 'predict_proba':
# relative_entropy_pred = self._relative_entropy(estimator_pred)
# relative_entropy_pred = relative_entropy_pred.astype('float32')
return estimator_pred.astype('float32'), relative_entropy_pred
else:
start = time.time()
meta_estimator_name = type(self.meta_estimator).__name__
self._verbose(f'Executing {meta_estimator_name}')
if self._is_classifier():
input_meta_features = np.concatenate(estimators_pred, axis=1)
std_meta_features = np.std(np.stack(estimators_pred, axis=2), axis=2)
else:
input_meta_features = np.stack(estimators_pred, axis=1)
std_meta_features = np.std(input_meta_features, axis=1)
meta_estimator_pred_method = getattr(self.meta_estimator, self.pred_method)
meta_estimator_pred = meta_estimator_pred_method(input_meta_features)
self._verbose(f'{meta_estimator_name} prediction time: {(time.time() - start):.2f} segs')
return meta_estimator_pred.astype('float32'), self.corr_factor * std_meta_features.astype('float32')
[docs] def predict_points(self,
input_points:DataFrame
):
"""
Predict point samples. It uses the ``feature_cols`` to retrieve the
input feature/covariates.
:param input_points: New set of point samples to be predicted.
:returns: The prediction result and the prediction uncertainty (only for EML)
:rtype: Tuple[Numpy.array, Numpy.array]
"""
input_data = np.ascontiguousarray(input_points[self.feature_cols].to_numpy(), dtype=np.float32)
n_points, _ = input_data.shape
self._verbose(f'Predicting {n_points} points')
return self._predict(input_data)
[docs] def predict(self,
dirs_layers:List = [],
fn_layers:List = [],
fn_output:str = None,
spatial_win:Window = None,
dtype = 'float32',
fill_nodata:bool = False,
separate_probs:bool = True,
hard_class:bool = True,
inmem_calc_func:Callable = None,
dict_layers_newnames:set = {},
allow_additional_layers:bool = False,
n_jobs_io:int = 4,
verbose_renaming:bool = True,
):
"""
Predict raster data. It matches the raster filenames with the input feature/covariates
used by training.
:param dirs_layers: A list of folders where the raster files are located.
:param fn_layers: A list with the raster paths. Provide it and the ``dirs_layers`` is ignored.
:param fn_output: File path where the prediction result is saved. For multiple outputs (probabilities,
uncertainty) the same location is used, adding specific suffixes in the provided file path.
:param spatial_win: Read the data and predict according to the spatial window. By default is ``None``,
which means all the data is read and predict.
:param dtype: Convert the read data to specific ``dtype``. For ``Float*`` the ``nodata`` values are
converted to ``np.nan``.
:param fill_nodata: Use the ``nodata_imputer`` to fill all ``np.nan`` values. By default is ``False``
because for almost all the cases it's preferable use the ``eumap.gapfiller module`` to perform this task.
:param separate_probs: Use ``True`` to save the predict probabilities in a separate raster, otherwise it's
write as multiple bands of a single raster file. For ``pred_method='predict'`` it's ignored.
:param hard_class: When ``pred_method='predict_proba'`` use ``True`` to save the predict dominant
class ``(*_hcl.tif)``, the probability ``(*_hcl_prob.tif)`` and uncertainty ``(*_hcl_uncertainty.tif)``
values of each dominant class.
:param inmem_calc_func: Function to be executed before the prediction. Use it to derive covariates/features
on-the-fly, calculating in memory, for example, a NDVI from the red and NIR bands.
:param dict_layers_newnames: A dictionary used to change the raster filenames on-the-fly. Use it to match
the column names for the point samples with different raster filenames.
:param allow_additional_layers: Use ``False`` to throw a ``Exception`` if a read raster is not present
in ``feature_cols``.
:param n_jobs_io: Number of parallel jobs to read the raster files.
:param verbose_renaming: show which raster layers are renamed
:returns: List with all the raster files produced as output.
:rtype: List[Path]
"""
input_data, fn_layers = self._read_layers(dirs_layers, fn_layers, spatial_win, \
dtype=dtype, inmem_calc_func=inmem_calc_func, dict_layers_newnames=dict_layers_newnames, \
allow_additional_layers=allow_additional_layers, n_jobs_io=n_jobs_io, \
verbose_renaming=verbose_renaming)
x_size, y_size, n_features = input_data.shape
input_data_shape = input_data.shape
input_data = input_data.reshape(-1, input_data_shape[2])
nan_mask = None
if fill_nodata:
input_data = self.fill_nodata(input_data)
else:
nan_mask = np.any(np.isnan(input_data), axis=1)
input_data[nan_mask,:] = 0
fn_base_layer = fn_layers[0]
pred_result, pred_uncer = self._predict(input_data)
fn_out_files = self._write_layers(fn_base_layer, fn_output, input_data_shape, pred_result, \
pred_uncer, nan_mask, spatial_win, separate_probs, hard_class, dtype = dtype)
fn_out_files.sort()
return fn_out_files
[docs] def predict_multi(self,
dirs_layers_list:List[List] = [],
fn_layers_list:List[List] = [],
fn_output_list:List[List] = [],
spatial_win:Window = None,
dtype:str = 'float32',
fill_nodata:bool = False,
separate_probs:bool = True,
hard_class:bool = True,
inmem_calc_func:Callable = None,
dict_layers_newnames_list:List[set] = [],
allow_additional_layers=False,
prediction_strategy_type = PredictionStrategyType.Lazy
):
"""
Predict multiple raster data. It matches the raster filenames with the input feature/covariates
used by training.
:param dirs_layers_list: A list of list containing the folders where the raster files are located.
:param fn_layers_list: A list of list containing the raster paths. Provide it and the
``dirs_layers_list`` is ignored.
:param fn_output_list: A list of file path where the prediction result is saved. For multiple outputs (probabilities,
uncertainty) the same location is used, adding specific suffixes in the provided file path.
:param spatial_win: Read the data and predict according to the spatial window. By default is ``None``,
which means all the data is read and predict.
:param dtype: Convert the read data to specific ``dtype``. For ``Float*`` the ``nodata`` values are
converted to ``np.nan``.
:param fill_nodata: Use the ``nodata_imputer`` to fill all ``np.nan`` values. By default is ``False``
because for almost all the cases it's preferable use the ``eumap.gapfiller module`` to perform this task.
:param separate_probs: Use ``True`` to save the predict probabilities in a separate raster, otherwise it's
write as multiple bands of a single raster file. For ``pred_method='predict'`` it's ignored.
:param hard_class: When ``pred_method='predict_proba'`` use ``True`` to save the predict dominant
class ``(*_hcl.tif)``, the probability ``(*_hcl_prob.tif)`` and uncertainty ``(*_hcl_uncertainty.tif)``
values of each dominant class.
:param inmem_calc_func: Function to be executed before the prediction. Use it to derive covariates/features
on-the-fly, calculating in memory, for example, a NDVI from the red and NIR bands.
:param dict_layers_newnames: A list of dictionaries used to change the raster filenames on-the-fly. Use it to match
the column names for the point samples with different raster filenames.
:param allow_additional_layers: Use ``False`` to throw a ``Exception`` if a read raster is not present
in ``feature_cols``.
:param prediction_strategy_type: Which strategy is used to predict the multiple raster data. By default is ``Lazỳ``,
loading one year while predict the other.
:returns: List with all the raster files produced as output.
:rtype: List[Path]
"""
PredictionStrategyClass = _LazyLoadPrediction
if PredictionStrategyType.Eager == prediction_strategy_type:
PredictionStrategyClass = _EagerLoadPrediction
prediction_strategy = PredictionStrategyClass(self, dirs_layers_list, fn_layers_list, fn_output_list,
spatial_win, dtype, fill_nodata, separate_probs, hard_class, inmem_calc_func,
dict_layers_newnames_list, allow_additional_layers)
return prediction_strategy.run()
[docs] @staticmethod
def load_instance(
fn_joblib
):
"""
Load a class instance from disk.
:param fn_joblib: Location of the saved instance.
:returns: Class instance
:rtype: LandMapper
"""
if not isinstance(fn_joblib, Path):
fn_joblib = Path(fn_joblib)
landmapper = joblib.load(fn_joblib)
for estimator in landmapper.estimator_list:
if landmapper._is_keras_model(estimator):
from tensorflow.keras.models import load_model
fn_keras = fn_joblib.parent.joinpath(f'{fn_joblib.stem}_keras.h5')
estimator['estimator'].model = load_model(fn_keras)
return landmapper
[docs] def save_instance(self,
fn_joblib:Path,
no_train_data:bool = False,
compress:str = 'lz4'
):
"""
Persist the class instance in disk using ``joblib.dump``. Use it to perform prediction
over new raster/point data without retrain the models from scratch.
:param fn_joblib: Location of the output file.
:param no_train_data: Remove all the training data before persist it
in disk.
:param compress: Enable compression.
"""
if not isinstance(fn_joblib, Path):
fn_joblib = Path(fn_joblib)
if no_train_data:
prop_to_del = [ 'pts', 'features', 'target', 'samples_weight', 'cv_groups']
for prop in prop_to_del:
if hasattr(self, prop):
if self.verbose:
ttprint(f'Removing {prop} attribute')
delattr(self, prop)
for estimator in self.estimator_list:
if self._is_keras_model(estimator):
basedir = fn_joblib.parent
fn_keras = fn_joblib.parent.joinpath(f'{fn_joblib.stem}_keras.h5')
estimator['estimator'].model.save(fn_keras)
estimator['estimator'].model = None
result = joblib.dump(self, fn_joblib, compress=compress)
for estimator in self.estimator_list:
if self._is_keras_model(estimator):
from tensorflow.keras.models import load_model
fn_keras = fn_joblib.parent.joinpath(f'{fn_joblib.stem}_keras.h5')
estimator['estimator'].model = load_model(fn_keras)
class _PredictionStrategy(ABC):
def __init__(self,
landmapper:LandMapper,
dirs_layers_list:List = [],
fn_layers_list:List = [],
fn_output_list:List = [],
spatial_win = None,
dtype = 'float32',
fill_nodata = False,
separate_probs = True,
hard_class = True,
inmem_calc_func = None,
dict_layers_newnames_list:list = [],
allow_additional_layers=False):
self.landmapper = landmapper
self.fn_layers_list = self._fn_layers_list(dirs_layers_list, fn_layers_list)
self.fn_output_list = fn_output_list
self.spatial_win = spatial_win
self.dtype = dtype
self.fill_nodata = fill_nodata
self.separate_probs = separate_probs
self.hard_class = hard_class
self.inmem_calc_func = inmem_calc_func
self.dict_layers_newnames_list = dict_layers_newnames_list
self.allow_additional_layers = allow_additional_layers
super().__init__()
def _fn_layers_list(self, dirs_layers_list, fn_layers_list):
if len(fn_layers_list) == 0:
for dirs_layers in dirs_layers_list:
fn_layers_list.append(find_files(dirs_layers, '*.tif'))
return fn_layers_list
@abstractmethod
def run(self):
pass
class _EagerLoadPrediction(_PredictionStrategy):
def __init__(self,
landmapper:LandMapper,
dirs_layers_list:List = [],
fn_layers_list:List = [],
fn_output_list:List = [],
spatial_win = None,
dtype = 'float32',
fill_nodata = False,
separate_probs = True,
hard_class = True,
inmem_calc_func = None,
dict_layers_newnames_list:list = [],
allow_additional_layers=False):
super().__init__(landmapper, dirs_layers_list, fn_layers_list, fn_output_list,
spatial_win, dtype, fill_nodata, separate_probs, hard_class, inmem_calc_func,
dict_layers_newnames_list, allow_additional_layers)
self.reading_futures = []
self.writing_futures = []
self.reading_pool = ThreadPoolExecutor(max_workers = 5)
self.writing_pool = ThreadPoolExecutor(max_workers = 5)
def reading_fn(self, i):
fn_layers = self.fn_layers_list[i]
dict_layers_newnames = {}
if len(self.dict_layers_newnames_list) > 0:
dict_layers_newnames = self.dict_layers_newnames_list[i]
start = time.time()
input_data, _ = self.landmapper._read_layers(fn_layers=fn_layers,
spatial_win=self.spatial_win, inmem_calc_func = self.inmem_calc_func,
dict_layers_newnames = dict_layers_newnames,
allow_additional_layers=self.allow_additional_layers)
self.landmapper._verbose(f'{i+1}) Reading time: {(time.time() - start):.2f} segs')
input_shape = input_data.shape
input_data = input_data.reshape(-1, input_shape[-1])
return (i, input_shape, input_data)
def writing_fn(self, i, pred_result, pred_uncer, nan_mask, input_shape):
fn_layers = self.fn_layers_list[i]
fn_output = self.fn_output_list[i]
fn_base_layer = fn_layers[0]
start = time.time()
fn_out_files = self.landmapper._write_layers(fn_base_layer, fn_output, input_shape, pred_result, \
pred_uncer, nan_mask, self.spatial_win, self.separate_probs, self.hard_class)
self.landmapper._verbose(f'{i+1}) Saving time: {(time.time() - start):.2f} segs')
return fn_out_files
def run(self):
output_fn_files = []
for i in range(0, len(self.fn_layers_list)):
self.reading_futures.append(
self.reading_pool.submit(self.reading_fn, (i))
)
positions, input_shape, input_data = zip(*[ future.result() for future in as_completed(self.reading_futures) ])
input_data = np.concatenate(input_data, axis=0)
self.landmapper._verbose(f'{i+1}) Predicting {input_data.shape[0]} pixels')
nan_mask = np.isnan(input_data)
input_data[nan_mask] = 0
start = time.time()
pred_result, pred_uncer = self.landmapper._predict(input_data)
self.landmapper._verbose(f'{i+1}) Predicting time: {(time.time() - start):.2f} segs')
del input_data
gc.collect()
nan_mask = np.any(nan_mask, axis=1)
n_elements, _ = pred_result.shape
year_data_size = n_elements / len(positions)
for i in positions:
i0 = int(i * year_data_size)
i1 = int((i+1) * year_data_size)
pred_result_year = pred_result[i0:i1,:]
pred_uncer_year = pred_uncer[i0:i1,:]
nan_mask_year = nan_mask[i0:i1]
input_shape_year = input_shape[i]
self.writing_futures.append(
self.writing_pool.submit(self.writing_fn, i, pred_result_year, pred_uncer_year, nan_mask_year, input_shape_year)
)
output_fn_files = []
for future in as_completed(self.writing_futures):
output_fn_files += future.result()
self.reading_pool.shutdown(wait=False)
self.writing_pool.shutdown(wait=False)
output_fn_files.sort()
return output_fn_files
class _LazyLoadPrediction(_PredictionStrategy):
def __init__(self,
landmapper:LandMapper,
dirs_layers_list:List = [],
fn_layers_list:List = [],
fn_output_list:List = [],
spatial_win = None,
dtype = 'float32',
fill_nodata = False,
separate_probs = True,
hard_class = True,
inmem_calc_func = None,
dict_layers_newnames_list:list = [],
allow_additional_layers=False,
reading_pool_size = 1,
processing_pool_size = 1,
writing_pool_size = 1):
super().__init__(landmapper, dirs_layers_list, fn_layers_list, fn_output_list,
spatial_win, dtype, fill_nodata, separate_probs, hard_class, inmem_calc_func,
dict_layers_newnames_list, allow_additional_layers)
self.data_pool = {}
self.result_pool = {}
self.reading_futures = []
self.processing_futures = []
self.writing_futures = []
self.reading_pool = ThreadPoolExecutor(max_workers = reading_pool_size)
self.processing_pool = ThreadPoolExecutor(max_workers = processing_pool_size)
self.writing_pool = ThreadPoolExecutor(max_workers = writing_pool_size)
def reading_fn(self, i):
fn_layers = self.fn_layers_list[i]
dict_layers_newnames = {}
if len(self.dict_layers_newnames_list) > 0:
dict_layers_newnames = self.dict_layers_newnames_list[i]
start = time.time()
input_data, _ = self.landmapper._read_layers(fn_layers=fn_layers,
spatial_win=self.spatial_win, inmem_calc_func = self.inmem_calc_func,
dict_layers_newnames = dict_layers_newnames,
allow_additional_layers=self.allow_additional_layers)
self.landmapper._verbose(f'{i+1}) Reading time: {(time.time() - start):.2f} segs')
input_data_key = str(uuid.uuid4())
self.data_pool[input_data_key] = input_data
self.processing_futures.append(
self.processing_pool.submit(self.processing_fn, i, input_data_key)
)
def processing_fn(self, i, input_data_key):
input_data = self.data_pool[input_data_key]
x_size, y_size, n_features = input_data.shape
input_data = input_data.reshape(-1, n_features)
self.landmapper._verbose(f'{i+1}) Predicting {x_size * y_size} pixels')
nan_mask = np.isnan(input_data)
input_data[nan_mask] = 0
start = time.time()
pred_result, pred_uncer = self.landmapper._predict(input_data)
self.landmapper._verbose(f'{i+1}) Predicting time: {(time.time() - start):.2f} segs')
del self.data_pool[input_data_key]
gc.collect()
nan_mask = np.any(nan_mask, axis=1)
input_data_shape = (x_size, y_size, n_features)
self.result_pool[input_data_key] = (pred_result, pred_uncer, nan_mask, input_data_shape)
self.writing_futures.append(
self.writing_pool.submit(self.wrinting_fn, i, input_data_key)
)
def wrinting_fn(self, i, input_data_key):
pred_result, pred_uncer, nan_mask, input_data_shape = self.result_pool[input_data_key]
fn_layers = self.fn_layers_list[i]
fn_output = self.fn_output_list[i]
fn_base_layer = fn_layers[0]
start = time.time()
fn_out_files = self.landmapper._write_layers(fn_base_layer, fn_output, input_data_shape, pred_result, \
pred_uncer, nan_mask, self.spatial_win, self.separate_probs, self.hard_class)
self.landmapper._verbose(f'{i+1}) Saving time: {(time.time() - start):.2f} segs')
del self.result_pool[input_data_key]
gc.collect()
return fn_out_files
def run(self):
output_fn_files = []
for i in range(0, len(self.fn_layers_list)):
self.reading_futures.append(
self.reading_pool.submit(self.reading_fn, (i))
)
reading_results = [ future.result() for future in as_completed(self.reading_futures) ]
processing_results = [ future.result() for future in as_completed(self.processing_futures) ]
output_fn_files = sum([ future.result() for future in as_completed(self.writing_futures) ], [])
self.reading_pool.shutdown(wait=False)
self.processing_pool.shutdown(wait=False)
self.writing_pool.shutdown(wait=False)
output_fn_files.sort()
return output_fn_files
class _ParallelOverlay:
# optimized for up to 200 points and about 50 layers
# sampling only first band in every layer
# assumption is that all layers have same blocks
def __init__(self,
points_x: np.ndarray,
points_y:np.ndarray,
fn_layers:List[str],
max_workers:int = parallel.CPU_COUNT,
verbose:bool = True
):
self.error_val = -32768
self.points_x = points_x
self.points_y = points_y
self.points_len = len(points_x)
self.fn_layers = fn_layers
self.max_workers = max_workers
self.verbose = verbose
self.layer_names = [fn_layer.with_suffix('').name for fn_layer in fn_layers]
sources = [rasterio.open(fn_layer) for fn_layer in self.fn_layers]
self.dimensions = [self._get_dimension(src) for src in sources]
self.points_blocks = self.find_blocks()
self.result = None
@staticmethod
def _get_dimension(src):
return (src.height, src.width, *src.block_shapes[0], *src.transform.to_gdal())
def _find_blocks_for_src_mt(self, ij, block, src, ptsx, ptsy):
left, bottom, right, top = rasterio.windows.bounds(block, src.transform)
ind = (ptsx>=left) & (ptsx<right) & (ptsy>bottom) & (ptsy<=top)
if ind.any():
inv_block_transform = ~rasterio.windows.transform(block, src.transform)
col, row = inv_block_transform * (ptsx[ind], ptsy[ind])
result = [block, np.nonzero(ind)[0], col.astype(int), row.astype(int)]
return ij, result
else:
return None, None
def _find_blocks_for_src(self, src, ptsx, ptsy):
# find blocks for every point in given source
blocks = {}
args = src.block_windows(1)
fixed_args = (src, ptsx, ptsy)
chunk_size = math.ceil(len(list(src.block_windows(1))) / self.max_workers)
for ij, result in parallel.ThreadGeneratorLazy(self._find_blocks_for_src_mt, args,
self.max_workers, chunk=chunk_size, fixed_args = fixed_args):
if ij is not None:
blocks[ij] = result
return blocks
def find_blocks(self):
# for every type of dimension find block for each point
points_blocks = {}
dimensions_set = set(self.dimensions)
sources = [rasterio.open(fn_layer) for fn_layer in self.fn_layers]
for dim in dimensions_set:
src = sources[self.dimensions.index(dim)]
points_blocks[dim] = self._find_blocks_for_src(src, self.points_x, self.points_y)
return points_blocks
def _sample_one_layer_sp(self, fn_layer):
with rasterio.open(fn_layer) as src:
#src=rasterio.open(fn_layer)
dim = self._get_dimension(src)
# out_sample = np.full((self.points_len,), src.nodata, src.dtypes[0])
out_sample = np.full((self.points_len,), np.nan, np.float32)
blocks = self.points_blocks[dim]
for ij in blocks:
# ij=next(iter(blocks)); (window, ind, col, row) = blocks[ij]
(window, ind, col, row) = blocks[ij]
try:
data = src.read(1, window=window)
mask = src.read_masks(1, window=window)
sample = data[row,col].astype(np.float32)
sample_mask = mask[row,col].astype(bool)
sample[~sample_mask] = np.nan
#sample = data[row.astype(int),col.astype(int)]
except Exception as exception:
traceback.print_exc()
sample = self.error_val
out_sample[ind] = sample
return out_sample, fn_layer
def _sample_one_block(self, args):
out_sample, fn_layer, window, ind, col, row = args
with rasterio.open(fn_layer) as src:
try:
data = src.read(1, window=window)
sample = data[row,col]
except Exception as exception:
traceback.print_exc()
sample = self.error_val
out_sample[ind] = sample
#return sample, ind
def _sample_one_layer_mt(self, fn_layer):
with rasterio.open(fn_layer) as src:
dim = self._get_dimension(src)
out_sample = np.full((self.points_len,), src.nodata, src.dtypes[0])
blocks = self.points_blocks[dim]
args = ((out_sample, fn_layer, *blocks[ij]) for ij in blocks)
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self._sample_one_block, args, chunksize=1000):
pass #out_sample[ind] = sample
return out_sample
def _sample_one_layer_mp(self, fn_layer):
with rasterio.open(fn_layer) as src:
dim = self._get_dimension(src)
out_sample = np.full((self.points_len,), src.nodata, src.dtypes[0])
blocks = self.points_blocks[dim]
args = ((out_sample, fn_layer, *blocks[ij]) for ij in blocks)
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self._sample_one_block, args, chunksize=1000):
pass #out_sample[ind] = sample
return out_sample
def sample_v1(self):
'''
Serial sampling, block by block
'''
res={}
n_layers = len(self.fn_layers)
for i_layer, fn_layer in enumerate(self.fn_layers):
if self.verbose:
ttprint(f'{i_layer}/{n_layers} {Path(fn_layer).name}')
# fn_layer=self.fn_layers[0]
col = Path(fn_layer).with_suffix('').name
sample,_ = self._sample_one_layer_sp(fn_layer)
res[col]=sample
return res
def sample_v2(self):
'''
Serial layers, parallel blocks in layer
'''
res={}
n_layers = len(self.fn_layers)
for i_layer, fn_layer in enumerate(self.fn_layers):
if self.verbose:
ttprint(f'{i_layer}/{n_layers} {Path(fn_layer).name}')
# fn_layer=self.fn_layers[0]
col = Path(fn_layer).with_suffix('').name
sample = self._sample_one_layer_mt(fn_layer)
res[col]=sample
return res
def sample_v3(self):
'''
Parallel layers in threads, serial blocks in layer
'''
res={}
i_layer=1
n_layers=len(self.fn_layers)
args = ((fn_layer.as_posix(),) for fn_layer in self.fn_layers)
for sample, fn_layer in parallel.ThreadGeneratorLazy(self._sample_one_layer_sp, args,
self.max_workers, self.max_workers*2):
col = Path(fn_layer).with_suffix('').name
if self.verbose:
ttprint(f'{i_layer}/{n_layers} {col}')
res[col] = sample
i_layer += 1
return res
def sample_v4(self):
'''
Parallel layers in processes, serial blocks in layer
'''
res={}
i_layer=1
n_layers=len(self.fn_layers)
args = ((fn_layer.as_posix(),) for fn_layer in self.fn_layers)
#with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
#for sample, fn_layer in executor.map(self._sample_one_layer_sp, args, chunksize=n_layers//self.max_workers):
for sample, fn_layer in parallel.ProcessGeneratorLazy(self._sample_one_layer_sp, args, self.max_workers, 1):
col = Path(fn_layer).with_suffix('').name
if self.verbose:
ttprint(f'{i_layer}/{n_layers} {col}')
res[col] = sample
i_layer += 1
return res
def run(self):
# For now sample_v3 is the best method, because is parallel and use less memory than sample_v4
if self.result == None:
self.result = self.sample_v3()
else:
if self.verbose:
ttprint('You already did run the overlay. Geting the cached result')
return self.result
[docs]class SpaceOverlay():
"""
Overlay a set of points over multiple raster files.
The retrieved pixel values are organized in columns
according to the filenames.
:param points: The path for vector file or ``geopandas.GeoDataFrame`` with
the points.
:param fn_layers: A list with the raster file paths. If it's not provided
the ``dir_layers`` and ``regex_layers`` are used to retrieve the raster files.
:param dir_layers: A list of folders where the raster files are located. The raster
are selected according to the pattern specified in ``regex_layers``.
:param regex_layers: Pattern to select the raster files in ``dir_layers``.
By default all GeoTIFF files are selected.
:param max_workers: Number of CPU cores to be used in parallel. By default all cores
are used.
:param verbose: Use ``True`` to print the overlay progress.
Examples
========
>>> from eumap.mapper import SpaceOverlay
>>>
>>> spc_overlay = SpaceOverlay('./my_points.gpkg', ['./raster_dir_1', './raster_dir_2'])
>>> result = spc_overlay.run()
>>>
>>> print(result.shape)
"""
def __init__(self,
points,
fn_layers:List[str] = [],
dir_layers:List[str] = [],
regex_layers = '*.tif',
max_workers:int = parallel.CPU_COUNT,
verbose:bool = True
):
if len(fn_layers) == 0:
fn_layers = find_files(dir_layers, regex_layers)
self.fn_layers = [ Path(l) for l in fn_layers ]
if not isinstance(points, gpd.GeoDataFrame):
points = gpd.read_file(points)
self.pts = points
self.pts['overlay_id'] = range(1,len(self.pts)+1)
self.parallelOverlay = _ParallelOverlay(self.pts.geometry.x.values, self.pts.geometry.y.values,
fn_layers, max_workers, verbose)
[docs] def run(self,
dict_newnames:set = {}
):
"""
Execute the space overlay.
:param dict_newnames: A dictionary used to update the column names after the overlay.
The ``key`` is the new name and the ``value`` is the raster file name (without extension).
:returns: Data frame with the original columns plus the overlay result (one new
column per raster).
:rtype: geopandas.GeoDataFrame
"""
result = self.parallelOverlay.run()
for col in result:
new_col = col
for newname in dict_newnames.keys():
new_col = re.sub(dict_newnames[newname], newname, new_col)
self.pts.loc[:,new_col] = result[col]
return self.pts
[docs]class SpaceTimeOverlay():
"""
Overlay a set of points over multiple raster considering the year information.
The retrieved pixel values are organized in columns according to the filenames.
:param points: The path for vector file or ``geopandas.GeoDataFrame`` with
the points.
:param col_date: Date column to retrieve the year information.
:param fn_layers: A list with the raster file paths. The file path placeholders
``{year}``, ``{year_minus_1}``, ``{year_plus_1}`` are replaced considering the
year information of each point.
:param max_workers: Number of CPU cores to be used in parallel. By default all cores
are used.
:param verbose: Use ``True`` to print the overlay progress.
Examples
========
>>> from eumap.mapper import SpaceTimeOverlay
>>>
>>> fn_layers = [ 'raster_{year_minus_1}1202..{year}0320.tif' ] # raster_20101202..20110320.tif, ...
>>> spt_overlay = SpaceTimeOverlay('./my_points.gpkg', 'survey_date' fn_layers)
>>> result = spt_overlay.run()
>>>
>>> print(result.shape)
"""
def __init__(self,
points,
col_date:str,
fn_layers:List[str] = [],
max_workers:int = parallel.CPU_COUNT,
verbose:bool = False
):
if not isinstance(points, gpd.GeoDataFrame):
points = gpd.read_file(points)
self.pts = points
self.col_date = col_date
self.overlay_objs = {}
self.verbose = verbose
self.year_placeholder = '{year}'
self.pts.loc[:,self.col_date] = pd.to_datetime(self.pts[self.col_date])
self.uniq_years = self.pts[self.col_date].dt.year.unique()
self.fn_layers = [ Path(l) for l in fn_layers ]
for year in self.uniq_years:
year = int(year)
year_points = self.pts[self.pts[self.col_date].dt.year == year]
fn_layers_year = []
for fn_layer in self.fn_layers:
fn_layers_year.append(Path(self._replace_year(fn_layer, year)))
if self.verbose:
ttprint(f'Overlay {len(year_points)} points from {year} in {len(fn_layers_year)} raster layers')
self.overlay_objs[year] = SpaceOverlay(points=year_points, fn_layers=fn_layers_year,
max_workers=max_workers, verbose=verbose)
def _replace_year(self, fn_layer, year = None):
y, y_m1, y_p1 = ('', '', '')
if year != None:
y, y_m1, y_p1 = str(year), str((year - 1)), str((year + 1))
fn_layer = str(fn_layer)
return fn_layer \
.replace('{year}', y) \
.replace('{year_minus_1}', y_m1) \
.replace('{year_plus_1}', y_p1) \
[docs] def run(self):
"""
Execute the spacetime overlay. It removes the year part from the column names.
For example, the raster ``raster_20101202..20110320.tif`` results in the column
name ``raster_1202..0320``.
:returns: Data frame with the original columns plus the overlay result (one new
column per raster).
:rtype: geopandas.GeoDataFrame
"""
self.result = None
for year in self.uniq_years:
year_newnames = {}
for fn_layer in self.fn_layers:
name = str(fn_layer.stem)
curname = self._replace_year(name, year)
newname = self._replace_year(name)
year_newnames[newname] = curname
if self.verbose:
ttprint(f'Running the overlay for {year}')
year_result = self.overlay_objs[year].run(year_newnames)
if self.result is None:
self.result = year_result
else:
self.result = self.result.append(year_result)
return self.result