Source code for eumap.misc

'''
Miscellaneous utils
'''

from typing import List, Dict, Union, Iterable
from datetime import datetime, timedelta
from functools import reduce

import rasterio
import geopandas as gp
import pandas as pd
import numpy as np
from pathlib import Path

def _warn_deps(e, module_name):
    import warnings
    warnings.warn(
        f'ERROR: {e}\n\n' \
        f'Encountered because {module_name} has additional dependencies, please try:\n\n' \
        '\tpip install eumap[full]\n'
    )

[docs]def ttprint(*args, **kwargs): """ A print function that displays the date and time. Examples ======== >>> from eumap.misc import ttprint >>> ttprint('eumap rocks!') """ from datetime import datetime import sys print(f'[{datetime.now():%H:%M:%S}] ', end='') print(*args, **kwargs, flush=True)
[docs]def find_files( dir_list:List, pattern:str = '*.*' ): """ Recursively find files in multiple directories according to the specified pattern. It's basically a wrapper for glob module [1] :param dir_list: List with multiple directory paths. :param pattern: Pattern to match with the desired files. Examples ======== >>> from eumap.misc import find_files >>> libs_so = find_files(['/lib', '/usr/lib64/'], f'*.so') >>> print(f'{len(libs_so)} files found') References ========== [1] `Python glob module <https://docs.python.org/3/library/glob.html>`_ """ files = [] if not isinstance(dir_list, list): dir_list = [dir_list] glob_pattern = f'**/{pattern}' for _dir in dir_list: for file in list(Path(_dir).glob(glob_pattern)): files.append(Path(file)) files = sorted(files) return files
[docs]def nan_percentile( arr:np.array, q:List = [25, 50, 75], keep_original_vals=False ): """ Optimized function to calculate percentiles ignoring ``np.nan`` in a 3D Numpy array [1]. :param arr: 3D Numpy array where the first dimension is used to derive the percentiles. :param q: Percentiles values between 0 and 100. :param keep_original_vals: If ``True`` it does a copy of ``arr`` to preserve the structure and values. Examples ======== >>> import numpy as np >>> from eumap.misc import nan_percentile >>> >>> data = np.random.rand(10, 10, 10) >>> data[2:5,0:10,0] = np.nan >>> data_perc = nan_percentile(data, q=[25, 50, 75]) >>> print(f'Shape: data={data.shape} data_perc={data_perc.shape}') References ========== [1] `Kersten's blog <https://krstn.eu/np.nanpercentile()-there-has-to-be-a-faster-way>`_ """ # loop over requested quantiles if type(q) is list: qs = [] qs.extend(q) else: qs = [q] # eliminate duplicate percentile qs = list(set(qs)) if len(qs) != len(q): print('duplicate percentile is eliminated') if keep_original_vals: arr = np.copy(arr) nanall = np.all(np.isnan(arr), axis=0) single_value = ~(np.sum(~np.isnan(arr),axis=0)-1).astype(bool) res_shape = (len(qs), arr.shape[1], arr.shape[2]) nanall = np.broadcast_to(nanall, shape=res_shape) # valid (non NaN) observations along the first axis valid_obs = np.sum(np.isfinite(arr), axis=0) # replace NaN with maximum max_val = np.nanmax(arr) arr[np.isnan(arr)] = max_val # sort - former NaNs will move to the end arr = np.sort(arr, axis=0) if len(qs) <= 2: quant_arr = np.zeros(shape=(arr.shape[1], arr.shape[2])) else: quant_arr = np.zeros(shape=(len(qs), arr.shape[1], arr.shape[2])) result = [] for i in range(len(qs)): quant = qs[i] # desired position as well as floor and ceiling of it k_arr = (valid_obs - 1) * (quant / 100.0) f_arr = np.floor(k_arr).astype(np.int32) c_arr = np.ceil(k_arr).astype(np.int32) fc_equal_k_mask = f_arr == c_arr # linear interpolation (like numpy percentile) takes the fractional part of desired position floor_val = _zvalueFromIndex(arr=arr, ind=f_arr) * (c_arr - k_arr) ceil_val = _zvalueFromIndex(arr=arr, ind=c_arr) * (k_arr - f_arr) quant_arr = floor_val + ceil_val quant_arr[fc_equal_k_mask] = _zvalueFromIndex(arr=arr, ind=k_arr.astype(np.int32))[fc_equal_k_mask] # if floor == ceiling take floor value result.append(quant_arr) result = np.stack(result, axis=0) result[nanall] = np.nan md = [i==50 for i in qs] if sum(md)==1: md_value = np.copy(result[md]) result[:,single_value] = np.nan result[md] = md_value else: result[:,single_value] = np.nan return result
def _zvalueFromIndex(arr, ind): """private helper function to work around the limitation of np.choose() by employing np.take() arr has to be a 3D array ind has to be a 2D array containing values for z-indicies to take from arr See: http://stackoverflow.com/a/32091712/4169585 This is faster and more memory efficient than using the ogrid based solution with fancy indexing. """ # get number of columns and rows _,nC,nR = arr.shape # get linear indices and extract elements with np.take() idx = nC*nR*ind + np.arange(nC*nR).reshape((nC,nR)) return np.take(arr, idx) def _stringify(arr): if isinstance(arr, np.ndarray): try: arr = arr.astype(int) except ValueError: pass arr = arr.astype(str) return arr def _add_group_elements(a1, a2): return np.core.defchararray.add(a1, _stringify(a2))
[docs]def sample_groups( points: gp.GeoDataFrame, *group_element_columns: Iterable[str], spatial_resolution: Union[int, float]=None, temporal_resolution: timedelta=None, date_column: str='date', ) -> np.ndarray: """ Construct group IDs for spatial and temporal cross-validation. Groups point samples into tiles of `spatial_resolution` width and height and/or intervals of `temporal_resolution` size. `group_element_columns` are also concatenated into the final group ID of each sample. :param points: GeoDataFrame containing point samples. :param *group_element_columns: Names of additional columns to be concatenated into the final group IDs. :param spatial_resolution: Tile size (both x and y) for grouping, in sample CRS units. :param temporal_resolution: Interval size for grouping. :param date_column: Name of the column containing sample timestamps (as datetime objects). :returns: 1D string array containing the group id of each sample. Examples ======== >>> import geopandas as gp >>> import pygeos as pg >>> import numpy as np >>> from datetime import datetime, timedelta >>> from sklearn.linear_model import LogisticRegression >>> from sklearn.model_selection import cross_val_score, GroupKFold >>> >>> from eumap.misc import sample_groups >>> >>> # construct some synthetic point data >>> coords = np.random.random((1000, 2)) * 4000 >>> dates = datetime.now() + np.array([*map( >>> timedelta, >>> range(1000), >>> )]) >>> >>> points = gp.GeoDataFrame({ >>> 'geometry': pg.points(coords), >>> 'date': dates, >>> 'group': np.random.choice(['a', 'b'], size=1000), >>> 'predictor': np.random.random(1000), >>> 'target': np.random.randint(2, size=1000), >>> }) >>> >>> # get the point groups >>> groups = sample_groups( >>> points, >>> 'group', >>> spatial_resolution=1000, >>> temporal_resolution=timedelta(days=365), >>> ) >>> >>> print(np.unique(groups)) >>> >>> kfold = GroupKFold(n_splits=5) >>> >>> # cross validate a classifier >>> print(cross_val_score( >>> estimator=LogisticRegression(), >>> X=points.predictor.values.reshape(-1, 1), >>> y=points.target, >>> scoring='f1', >>> groups=groups, # our groups go here >>> )) """ group_elements = [ points[col].values.astype(str) for col in group_element_columns ] if all(( not group_elements, spatial_resolution is None, temporal_resolution is None, )): raise ValueError( 'no group elemements, specify one or more of the following:\n' \ '\tgroup_element_columns\n' \ '\tspatial_resolution\n' \ '\ttemporal_resolution\n' \ ) if spatial_resolution is not None: x_el = points.geometry.x.values // spatial_resolution x_el -= x_el.min() group_elements += ['x', x_el] y_el = points.geometry.y.values // spatial_resolution y_el -= y_el.min() group_elements += ['y', y_el] if temporal_resolution is not None: _tres = np.timedelta64(temporal_resolution) times = points[date_column].values t_el = (times - times.min()) // _tres group_elements += ['t', t_el] return reduce( _add_group_elements, group_elements, )
def _eval(str_val, args): return eval("f'"+str_val+"'", args) try: import gspread import pytz
[docs] class GoogleSheet(): """ Utility class able to convert a remote Google Spreadsheet file into a pandas.DataFrame. Each sheet is converted to a separate pandas.DataFrame accessible by class attribute. :param key_file: Authentication key to access spreadsheets via Google Sheets API :param url: Complete URL referring to a Google Spreadsheet file (public accessible). :param col_list_suffix: All the columns with this suffix are converted to a list of strings. :param col_list_delim: Text delimiter used to separate the list elements. :param col_date_suffix: All the columns with this suffix are converted to a date object. :param col_date_format: Date format used to convert string values in date object. :param verbose: Use ``True`` to print the progress of all steps. Examples ======== >>> # Generate your key follow the instructions in https://docs.gspread.org/en/latest/oauth2.html >>> key_file = '<GDRIVE_KEY>' >>> # Public accessible Google Spreadsheet (Anyone on the internet with this link can view) >>> url = 'https://docs.google.com/spreadsheets/d/1O3n5O6MQ3OPX--ZbJEREC5fu-bLKK2AaTYDqeAPMRQY/edit?usp=sharing' >>> >>> gsheet = GoogleSheet(key_file, url) >>> print('Sheet points_nl: ', gsheet.points_nl.shape) >>> print('Sheet tiles: ', gsheet.tiles.shape) References ========== [1] `Authentication - gspread <https://docs.gspread.org/en/latest/oauth2.html>`_ """ def __init__(self, key_file:str, url:str, col_list_suffix:str = '_list', col_list_delim:str = ',', col_date_suffix:str = '_date', col_date_format:str = '%Y-%m-%d', verbose:bool = False, ): self.key_file = key_file self.url = url self.verbose = verbose self.col_list_suffix = col_list_suffix self.col_list_delim = col_list_delim self.col_date_suffix = col_date_suffix self.col_date_format = col_date_format self._read_gsheet() def _verbose(self, *args, **kwargs): if self.verbose: ttprint(*args, **kwargs) def _read_gsheet(self): gc = gspread.service_account(filename=self.key_file) self._verbose(f"Accessing {self.url}") sht = gc.open_by_url(self.url) for wsht in sht.worksheets(): self._verbose(f"Retrieving the data from {wsht.title}") rows = wsht.get_values() title = wsht.title try: setattr(self, title, self._parse_df(rows)) except: ttprint(f'ERROR: unable to parse {title}, ignoring it.') def _parse_df(self, rows): pytz.timezone("UTC") df = pd.DataFrame(rows[1:], columns=rows[0]) to_drop = [] for column in df.columns: self._verbose(f" Parsing column {column}") if column.endswith(self.col_list_suffix): new_column = column.replace(self.col_list_suffix, '') df[new_column] = df[column].str.split(self.col_list_delim) to_drop.append(column) if column.endswith(self.col_date_suffix): df[column] = pd.to_datetime(df[column], format=self.col_date_format, errors='coerce') return df.drop(columns=to_drop)
except ImportError as e: from .misc import _warn_deps _warn_deps(e, 'misc.GoogleSheet')