'''
Gapfilling approaches using temporal and spatial neighbor pixels
'''
import multiprocessing
import traceback
import warnings
import time
import math
import os
import gc
try:
from itertools import cycle, islice, chain
from typing import List, Dict, Union
from rasterio.windows import Window
from abc import ABC, abstractmethod
from pathlib import Path
from enum import Enum
from pyts.decomposition import SingularSpectrumAnalysis
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import bottleneck as bc
import cv2 as cv
import numpy as np
from .misc import ttprint
from .datasets import DATA_ROOT_NAME
from . import parallel
from .raster import read_rasters, save_rasters
[docs] class OutlierRemover(Enum):
"""
Strategy to remove outliers considering the temporal domain.
"""
Std = 1 #: Standard deviation moving window
Percentile = 2 #: Percentile calculation
[docs] class ImageGapfill(ABC):
"""
Abstract class responsible for read/write the raster files in
all implemented gapfilling methods.
:param fn_files: Raster file paths to be read and gapfilled. The filename alphabetic order
is used to infer the temporal order for the read data.
:param data: 3D array where the last dimension is the time.
:param outlier_remover: Strategy to remove outliers.
:param std_win: Temporal window size used to calculate a local median and std.
:param std_env: Number of std used to define a local envelope around the median.
Values outside of this envelope are removed.
:param perc_env: A list containing the lower and upper percentiles used to defined a global
envelope for the time series. Values outside of this envelope are removed.
:param n_jobs_io: Number of parallel jobs to read/write raster files.
:param verbose: Use ``True`` to print the progress of the gapfilled.
"""
def __init__(self,
fn_files:List = None ,
data:np.array = None,
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose:bool = True,
):
if data is None and fn_files is None:
raise ValueError(f'You should provide at least one of these: data or fn_files')
self.n_jobs_io = n_jobs_io
if data is None:
self.fn_files = fn_files
self.data, _ = read_rasters(raster_files=fn_files, verbose=verbose, n_jobs=self.n_jobs_io)
else:
self.data = data
self.outlier_remover = outlier_remover
self.perc_env = perc_env
self.std_win = std_win
self.std_env = std_env
if self.std_win is not None and (self.std_win % 2) == 0:
raise ValueError(f'The std_win argument must be an odd number')
self.outlier_fn = None
if OutlierRemover.Std == outlier_remover:
self.outlier_fn = self._remove_outliers_std
elif OutlierRemover.Percentile == outlier_remover:
self.outlier_fn = self._remove_outliers_perc
self.verbose = verbose
def _verbose(self, *args, **kwargs):
if self.verbose:
ttprint(*args, **kwargs)
def _n_gaps(self, data = None):
if data is None:
data = self.data
return np.sum(np.isnan(data).astype('int'))
def _remove_outliers_perc(self, ts_data):
ts_data = ts_data.copy()
lower_pct, higher_pct = np.nanpercentile(ts_data, q = self.perc_env)
valid_mask = np.logical_and(ts_data >= lower_pct, ts_data <= higher_pct)
ts_data[~valid_mask] = np.nan
return ts_data
def _remove_outliers_std(self, ts_data):
if np.sum(np.isnan(ts_data).astype('int')) != ts_data.shape[0]:
ts_data = ts_data.astype('float32')
ts_size = ts_data.shape[0]
with warnings.catch_warnings():
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
glob_med = bc.nanmedian(ts_data)
ts_neib = int((self.std_win - 1) / 2)
env = self.std_env
min_len_std = 3
outliers = []
for i in range(0, ts_size):
i0 = 0 if (i - ts_neib) < 0 else (i - ts_neib)
i1 = ts_size if (i + ts_neib) + 1 > ts_size else (i + ts_neib) + 1
# Expand the window in the boundaries years
if i1 == ts_size:
i0 -= self.std_win - (i1 - i0)
if i0 == 0:
i1 += self.std_win - (i1 - i0)
win_data = ts_data[i0:i1].copy()
win_data[np.isnan(win_data)] = glob_med
with warnings.catch_warnings():
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
med = bc.nanmedian(win_data)
std = bc.nanstd(win_data)
lower = (med - std * env)
upper = (med + std * env)
outliers.append(lower > ts_data[i] or upper < ts_data[i])
outliers = np.array(outliers)
ts_data[outliers] = np.nan
return ts_data.astype('float16')
def _perc_gapfilled(self, gapfilled_data):
data_nan = np.isnan(self.data)
n_nan = np.sum(data_nan.astype('int'))
n_gapfilled = np.sum(data_nan[~np.isnan(gapfilled_data)].astype('int'))
return n_gapfilled / n_nan
[docs] def run(self):
"""
Execute the gapfilling approach.
"""
self._verbose(f'There are {self._n_gaps()} gaps in {self.data.shape}')
start = time.time()
if self.outlier_fn:
self._verbose(f'Removing temporal outliers using {self.outlier_remover}')
data_noout = parallel.apply_along_axis(self.outlier_fn, 2, self.data)
self.outlier_mask = np.logical_and(~np.isnan(self.data), np.isnan(data_noout))
n_outliers = np.sum(self.outlier_mask.astype('int'))
self._verbose(f'{n_outliers} outliers removed')
self.data = data_noout
self._verbose(f'There are {self._n_gaps()} gaps in {self.data.shape}')
self.gapfilled_data, self.gapfilled_data_flag = self._gapfill()
gaps_perc = self._perc_gapfilled(self.gapfilled_data)
self._verbose(f'{gaps_perc*100:.2f}% of the gaps filled in {(time.time() - start):.2f} segs')
if gaps_perc < 1:
self._verbose(f'Remained gaps: {self._n_gaps(self.gapfilled_data)}')
return self.gapfilled_data
@abstractmethod
def _gapfill(self):
pass
[docs] def save_rasters(self,
out_dir,
dtype:str = None,
out_mantain_subdirs:bool = True,
root_dir_name:str = DATA_ROOT_NAME,
fn_files:List = None,
nodata = None,
spatial_win:Window = None,
save_flag = True,
):
"""
Save the result in raster files maintaining the same filenames
of the read rasters.
:param out_dir: Folder path to save the files.
:param dtype: Convert the rasters for the specified Numpy ``dtype`` before save. This argument overwrite the values
retrieved of ``fn_files``
:param out_mantain_subdirs: Keep the full folder hierarchy of the read raster in the ``out_dir``.
:param root_dir_name: Keep the relative folder hierarchy of the read raster in the ``out_dir``
considering of the sub folders of ``root_dir_name``.
:param fn_files: Raster file paths to retrieve the filenames and folders. Use this parameter in
situations where the ``data`` parameter is informed in the class constructor. The pixel size,
crs, extent, image size and nodata for the gapfilled rasters are retrieved from the first valid
raster of ``fn_files``
:param nodata: ``Nodata`` value used for the the gapfilled rasters. This argument overwrite the values
retrieved of ``fn_files``. This argument doesn't affect the flag rasters (gapfill summary), which have
``nodata=0``.
:param spatial_win: Save the gapfilled rasters considering the specified spatial window.
:param save_flag: Save the flag rasters (gapfill summary).
"""
if fn_files is not None:
self.fn_files = fn_files
if self.fn_files is None:
raise Exception(f'To use save_rasters you should provide a fn_files list.')
fn_base_img = None
for i in range(0, len(self.fn_files)):
if self.fn_files[i] is not None and self.fn_files[i].is_file():
fn_base_img = self.fn_files[i]
break
n_files = len(self.fn_files)
n_data = self.gapfilled_data.shape[2]
if n_files != n_data:
raise Exception(f'The fn_files incompatible with gapfilled_data shape ({self.gapfilled_data.shape})')
if not isinstance(out_dir, Path):
out_dir = Path(out_dir)
fn_gapfilled_list = []
fn_flag_list = []
for i in range(0,n_files):
src_fn = Path(self.fn_files[i])
if out_mantain_subdirs:
cur_out_dir = out_dir.joinpath(str(src_fn.parent).split(root_dir_name)[-1][1:])
fn_gapfilled = cur_out_dir.joinpath('%s.tif' % src_fn.stem)
fn_flag = cur_out_dir.joinpath('%s_flag.tif' % src_fn.stem)
else:
fn_gapfilled = out_dir.joinpath('%s.tif' % src_fn.stem)
fn_flag = out_dir.joinpath('%s_flag.tif' % src_fn.stem)
fn_gapfilled.parent.mkdir(parents=True, exist_ok=True)
fn_gapfilled_list.append(fn_gapfilled)
fn_flag_list.append(fn_flag)
result = save_rasters(fn_base_img, fn_gapfilled_list, self.gapfilled_data,
dtype = dtype, spatial_win = spatial_win, nodata=nodata)
if save_flag:
flag_result = save_rasters(fn_base_img, fn_flag_list, self.gapfilled_data_flag,
dtype = 'uint8', spatial_win = spatial_win, nodata=0, n_jobs=self.n_jobs_io)
result = result + flag_result
self._verbose(f'Number of files saved in {out_dir}: {len(result)}')
return result
class _TMWMData():
def __init__(self,
time_order: List,
time_data, time_win_size = 9,
cpu_max_workers:int = multiprocessing.cpu_count(),
engine='CPU',
gpu_tile_size:int = 250
):
self.time_win_size = time_win_size
self.time_order = time_order
self.time_data = time_data
self.cpu_max_workers = cpu_max_workers
self.gpu_tile_size = gpu_tile_size
self.engine = engine
self.gapfilled_data = {}
def _get_time_window(self, t, n_layers, win_size):
neib_size = int((win_size - 1) / 2)
return(
(t - neib_size) if (t - neib_size) > 0 else 0,
(t + neib_size) if (t + neib_size) < (n_layers-1) else (n_layers-1),
)
def _key_from_layer_pos(self, time, layer_pos, win_size):
_, _, n_layers = self.time_data[time].shape
t1, t2 = self._get_time_window(layer_pos, n_layers, win_size)
return self._key_from_time(time, t1, t2)
def _key_from_time(self, time, t1, t2):
return f'{time}_{t1}_{t2}'
def _calc_nanmedian(self, time, t1, t2):
with warnings.catch_warnings():
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
result = bc.nanmedian(self.time_data[time][:,:,t1:t2+1].astype('float32'), axis=2)
return self._key_from_time(time, t1, t2), result.astype('float16')
def _cpu_processing(self, args):
for key, data in parallel.job(self._calc_nanmedian, iter(args), n_jobs=self.cpu_max_workers, joblib_args={'require': 'sharedmem'}):
self.gapfilled_data[key] = data
def _gpu_processing(self, args):
try:
import cupy as cp
except ImportError:
warnings.warn('GPU engine requires cupy>=9.2.0')
x_size, y_size, _ = self.time_data[self.time_order[0]].shape
for x1 in chain(range(0, x_size, self.gpu_tile_size), [self.gpu_tile_size]):
for y1 in chain(range(0, x_size, self.gpu_tile_size), [self.gpu_tile_size]):
gpu_data = {}
x2 = ( (x_size-1) if (x1 + self.gpu_tile_size) >= x_size else (x1 + self.gpu_tile_size))
y2 = ( (y_size-1) if (y1 + self.gpu_tile_size) >= y_size else (y1 + self.gpu_tile_size))
for time in self.time_order:
gpu_data[time] = cp.asarray(self.time_data[time][x1:x2,y1:y2,:])
for time, t1, t2 in iter(args):
key = self._key_from_time(time, t1, t2)
if key not in self.gapfilled_data:
self.gapfilled_data[key] = np.zeros((x_size, y_size), dtype='float32')
gpu_median = cp.nanmedian(gpu_data[time][:,:,t1:t2], axis=2)
self.gapfilled_data[key][x1:x2,y1:y2] = cp.asnumpy(gpu_median)
gpu_median = None
gpu_data = None
def run(self):
"""Perform the implemented gapfilling operation
"""
self.gapfilled_data = {}
self.available_windows = {}
args = set()
for time in self.time_order:
_, _, n_layers = self.time_data[time].shape
self.available_windows[time] = []
for layer_pos in range(0, n_layers):
self.available_windows[time].append([])
for w in range(self.time_win_size, n_layers, self.time_win_size):
t1, t2 = self._get_time_window(layer_pos, n_layers, w)
self.available_windows[time][layer_pos].append(w)
args.add((time, t1, t2))
ttprint(f'Calculating {len(args)} gap filling possibilities')
if self.engine == 'CPU':
ttprint(f'Using cpu engine')
self._cpu_processing(args)
elif self.engine == 'GPU':
ttprint(f'Using GPU engine')
self._gpu_processing(args)
ttprint(f'Possibilities calculated')
def get(self, time, layer_pos):
time_list = (time if type(time) == list else [time])
result = {}
for t in time_list:
try:
for win_size in (self.available_windows[t][layer_pos]):
result[win_size] = ( [] if win_size not in result else result[win_size])
key = self._key_from_layer_pos(t, layer_pos, win_size)
result[win_size].append(self.gapfilled_data[key])
except:
traceback.format_exc()
continue
for win_size in list(result.keys()):
win_stacked = np.stack(result[win_size], axis=2).astype('float32')
with warnings.catch_warnings():
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
result[win_size] = bc.nanmedian(win_stacked, axis=2).astype('float16')
del win_stacked
return result
[docs] class TMWM2(ImageGapfill):
def __init__(self,
fn_files:List = None ,
data:np.array = None,
season_size = None,
time_win_size: int=9,
precomputed_flags:np.array = None,
max_same_season_win = None,
max_neib_season_win = None,
max_annual_win = None,
time_win_direction = 'both',
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose = True
):
super().__init__(fn_files=fn_files, data=data,
outlier_remover=outlier_remover, std_win=std_win,
std_env=std_env, perc_env=perc_env, n_jobs_io=n_jobs_io,
verbose=verbose)
self.time_win_size = time_win_size
self.season_size = season_size
self.data = np.ascontiguousarray(self.data.transpose((2,0,1)))
self.time_series_size = self.data.shape[0]
self.tile_size = (self.data.shape[1], self.data.shape[2])
self.n_years = int(self.time_series_size / self.season_size)
self.precomputed_flags = precomputed_flags
self.time_win_direction = time_win_direction
self.max_same_season_win = max_same_season_win
if self.max_same_season_win is None:
self.max_same_season_win = self.n_years
self.max_neib_season_win = max_neib_season_win
if self.max_neib_season_win is None:
self.max_neib_season_win = self.n_years
self.max_annual_win = max_annual_win
if self.max_annual_win is None:
self.max_annual_win = self.n_years
self.summary_gaps = np.sum(
np.isnan(self.data).astype('int8'),
axis=(1,2)
)
if (self.time_win_size % 2) == 0:
raise ValueError(f'The time_win_size argument must be an odd number')
if (self.max_same_season_win < self.time_win_size):
raise ValueError(f'The max_same_season_win argument can not be less than {time_win_size} (time_win_size)')
if (self.max_neib_season_win < self.time_win_size):
raise ValueError(f'The max_neib_season_win argument can not be less than {time_win_size} (time_win_size)')
if (self.max_annual_win < self.time_win_size):
raise ValueError(f'The max_annual_win argument can not be less than {time_win_size} (time_win_size)')
self.n_jobs = multiprocessing.cpu_count()
if self.time_series_size < self.n_jobs:
self.n_jobs = self.time_series_size
def _sanitize_win(self, idx):
return idx[
np.logical_and(
idx >= 0,
idx < self.time_series_size
)
]
def _season_win(self, i, t_margin):
margin = (self.season_size * t_margin)
i_past = i - margin
i_futu = i + margin + self.season_size
if self.time_win_direction == 'past':
i_futu = i
elif self.time_win_direction == 'future':
i_past = i
return self._sanitize_win(
np.arange(i_past, i_futu, self.season_size)
)
def _annual_win(self, i, t_margin):
win_size = (t_margin * 2) + 1
start = int(i / self.season_size)
margin = win_size * self.season_size
i_past = start - margin
i_futu = start + margin
if self.time_win_direction == 'past':
i_futu = i
elif self.time_win_direction == 'future':
i_past = i
return self._sanitize_win(
np.arange(i_past, i_futu, 1)
)
def _time_windows(self, i):
win_map = {}
win_idx = []
for j in range(0, self.n_years - self.time_win_size):
t = ((j * 2) + self.time_win_size)
t_margin = int((t - 1) / 2)
win_size = (t_margin * 2) + 1
#print(f"{i} {j} {win_size}")
same_season_win = self._season_win(i, t_margin)
if len(same_season_win) > 0 and win_size <= self.max_same_season_win:
#print(f"same_season_win: {i} {j} {win_size} {len(same_season_win)}")
same_season_idx = t
win_idx.append(same_season_idx)
win_map[same_season_idx] = same_season_win
neib_season_win = np.concatenate([
self._season_win(i-1, t_margin),
self._season_win(i+1, t_margin)
])
if len(neib_season_win) > 0 and win_size <= self.max_neib_season_win:
#print(f"neib_season_win: {i} {j} {win_size} {len(neib_season_win)}")
neib_season_idx = t + self.n_years
win_idx.append(neib_season_idx)
win_map[neib_season_idx] = neib_season_win
annual_win = self._annual_win(i, t_margin)
if len(annual_win) > 0 and win_size <= self.max_annual_win:
#print(f"annual_win: {i} {j} {win_size} {len(annual_win)}")
annual_idx = t + (2 * self.n_years)
win_idx.append(annual_idx)
win_map[annual_idx] = annual_win
win_idx.sort()
return (win_idx, win_map)
def _win_choice(self, win_map, idx, i):
to_gapfill = self.data[i,:,:]
to_reduce = self.data[win_map[idx]]
availability_mask = np.logical_and(
np.isnan(to_gapfill),
~np.isnan(bc.nanmin(to_reduce, axis=(0)))
)
r = np.empty(self.tile_size, dtype='float32')
r[:,:] = np.nan
r[availability_mask] = idx
return r
def _flags(self, i, win_idx, win_map):
#args = [ (win_map, idx, i) for idx in win_idx ]
win_choice = []
#for r in parallel.job(self._win_choice, args, n_jobs=len(args), joblib_args={'backend': 'threading'}):
for idx in win_idx:
r = self._win_choice(win_map, idx, i)
win_choice.append(r)
win_choice = bc.nanmin(np.stack(win_choice, axis=0), axis=0)
gc.collect()
return win_choice
def _reduce(self, i, to_reduce, mask):
mask_b = np.broadcast_to(mask, to_reduce.shape)
to_reduce[~mask_b] = np.nan
return bc.nanmedian(to_reduce, axis=0), mask
def _run(self, i):
n_gaps = self.summary_gaps[i]
try:
if n_gaps > 0:
win_idx, win_map = self._time_windows(i)
if self.precomputed_flags is None:
flags = self._flags(i, win_idx, win_map)
flags[np.isnan(flags)] = 0
flags = flags.astype('uint8')
else:
flags = self.precomputed_flags[i,:,:]
flags_uniq = np.unique(flags[flags != 0])
args = [ (i, self.data[win_map[idx]], (flags == idx)) for idx in flags_uniq ]
gapfilled = self.data[i,:,:].copy()
#for reduced, mask in parallel.job(self._reduce, args, n_jobs=len(args), joblib_args={'backend': 'threading'}):
for arg in args:
reduced, mask = self._reduce(*arg)
gapfilled[mask] = reduced[mask]
return gapfilled, flags
else:
return self.data[i,:,:], np.zeros(self.tile_size, dtype='uint8')
except:
return self.data[i,:,:], np.zeros(self.tile_size, dtype='uint8')
finally:
gc.collect()
def _gapfill(self):
args = [ (i,) for i in range(self.time_series_size) ]
gapfilled_data = []
gapfilled_data_flag = []
win_idx, win_map = self._time_windows(int(self.time_series_size/2))
ttprint(f"Deriving {len(win_idx)} gap filling possibilities for each date")
for gapfilled, flags in parallel.job(self._run, args, n_jobs=self.n_jobs, joblib_args={'backend': 'multiprocessing'}):
gapfilled_data.append(gapfilled)
gapfilled_data_flag.append(flags)
gapfilled_data = np.stack(gapfilled_data, axis=0)
gapfilled_data_flag = np.stack(gapfilled_data_flag, axis=0)
return(gapfilled_data, gapfilled_data_flag)
[docs] class TMWM(ImageGapfill):
"""
Temporal Moving Window Median able to gapfill the missing pixels using the temporal neighborhood
by a moving window to calculate several median possibilities. The approach prioritizes
values derived for **1–the same day/month/season**, **2–neighboring days/months/seasons**,
**and 3–all the year**.
For example, in the best case scenario a missing pixel on Jan-2005 is filled using a median
value derived from January of other years, and in the worst case scenario it uses a value derived
of all the months and available years. If a pixel remains with a missing value, it is because
there is **no valid data on the entire time series**.
:param fn_files: Raster file paths to be read and gapfilled.
:param data: 3D array where the last dimension is the time.
:param season_size: Season size of a year (for monthly time series it is equal ``12``).
:param time_win_size: Size of the temporal window used to calculate the median value possibilities.
:param cpu_max_workers: Number of CPU cores to be used in parallel.
:param engine: Execute in ``CPU`` [1] or ``GPU`` [2].
:param gpu_tile_size: Tile size used to split the processing in the GPU.
:param outlier_remover: Strategy to remove outliers.
:param std_win: Temporal window size used to calculate a local median and std.
:param std_env: Number of std used to define a local envelope around the median.
Values outside of this envelope are removed.
:param perc_env: A list containing the lower and upper percentiles used to defined a global
envelope for the time series. Values outside of this envelope are removed.
:param n_jobs_io: Number of parallel jobs to read/write raster files.
:param verbose: Use ``True`` to print the progress of the gapfilled.
Examples
========
>>> from eumap import gapfiller
>>>
>>> # For a 4-season time series
>>> tmwm = gapfiller.TMWM(fn_files=fn_rasters, season_size=4, time_win_size=4)
>>> data_tmwm = tmwm.run()
>>>
>>> fn_rasters_tmwm = tmwm.save_rasters('./gapfilled_tmwm')
References
==========
[1] `Bootleneck nanmedian <https://kwgoodman.github.io/bottleneck-doc/reference.html#bottleneck.nanmedian>`_
[2] `CuPY nanmedian <https://docs.cupy.dev/en/stable/reference/generated/cupy.nanmedian.html>`_
"""
def __init__(self,
fn_files:List = None ,
data:np.array = None,
season_size = None,
time_win_size: int=9,
cpu_max_workers:int = multiprocessing.cpu_count(),
engine='CPU',
gpu_tile_size:int = 250,
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose = True
):
super().__init__(fn_files=fn_files, data=data,
outlier_remover=outlier_remover, std_win=std_win,
std_env=std_env, perc_env=perc_env, n_jobs_io=n_jobs_io,
verbose=verbose)
self.cpu_max_workers = cpu_max_workers
self.time_win_size = time_win_size
self.engine = engine
self.gpu_tile_size = gpu_tile_size
self.season_size = season_size
if (self.time_win_size % 2) == 0:
raise ValueError(f'The time_win_size argument must be an odd number')
def _do_time_data(self):
total_times = self.data.shape[2]
self.time_order = [ str(time) for time in range(0, self.season_size) ]
self.time_data = {}
self.n_years = 0
for time in self.time_order:
idx = list(range(int(time), total_times, self.season_size))
if len(idx) > self.n_years:
self.n_years = len(idx)
self.time_data[time] = self.data[:,:,idx]
self._verbose(f"Data {self.time_data[time].shape} organized in time={time}")
def _get_neib_times(self, time):
total_times = len(self.time_order)
i = self.time_order.index(time)
time_order_rev = self.time_order.copy()
time_order_rev.reverse()
neib_times = []
for j in range(1, math.ceil(total_times / 2) + 1):
# Setup slicer in the right positions
after = islice(cycle(self.time_order), i, None)
before = islice(cycle(time_order_rev), total_times-i, None)
next(after)
before_tim = [next(before) for t in range(0,j)]
after_tim = [next(after) for t in range(0,j)]
neib_times.append((before_tim, after_tim))
return neib_times
def _fill_gaps(self, time, layer_pos, newdata_dict, verbose_suffix='', gapflag_offset = 0):
end_msg = None
keys = list(newdata_dict.keys())
keys.sort()
for i in newdata_dict.keys():
gaps_mask = np.isnan(self.time_data[time][:,:,layer_pos])
n_gaps = np.sum(gaps_mask.astype('int'))
if n_gaps > 0:
newdata = newdata_dict[i]
gapfill_mask = np.logical_and(~np.isnan(newdata),gaps_mask)
self.time_data[time][:,:,layer_pos][gapfill_mask] = newdata[gapfill_mask]
self.time_data_gaps[time][:,:,layer_pos][gapfill_mask] = int(i) + gapflag_offset
else:
end_msg = f'Layer {layer_pos}: gapfilled all with {i}-year window from {verbose_suffix})'
break
return end_msg
def _fill_gaps_all_times(self, time, layer_pos):
_, _, n_layers = self.time_data[time].shape
end_msg = None
all_data = self.tmwm_data.get(self.time_order, layer_pos)
end_msg = self._fill_gaps(time, layer_pos, all_data, verbose_suffix='all seasons', gapflag_offset = n_layers*2)
return end_msg
def _fill_gaps_neib_time(self, time, layer_pos):
newdata_dict = {}
end_msg = None
_, _, n_layers = self.time_data[time].shape
for before_times, after_times in self._get_neib_times(time):
before_data = self.tmwm_data.get(before_times, layer_pos)
after_data = self.tmwm_data.get(after_times, layer_pos)
keys = list(set(before_data.keys()).intersection(after_data.keys()))
keys.sort()
for i in keys:
stacked = np.stack([before_data[i], after_data[i]], axis=2)
valid_mean = np.any(np.isnan(stacked), axis=2)
stacked[valid_mean] = np.nan
newdata_dict[i] = np.nanmean(stacked, axis=2)
end_msg = self._fill_gaps(time, layer_pos, newdata_dict, verbose_suffix='neighborhood seasons', gapflag_offset = n_layers)
if end_msg is not None:
break
return end_msg
def _fill_gaps_same_time(self, time, layer_pos):
newdata_dict = self.tmwm_data.get(time, layer_pos)
return self._fill_gaps(time, layer_pos, newdata_dict, verbose_suffix='same season')
def fill_image(self, time, layer_pos):
end_msg = self._fill_gaps_same_time(time, layer_pos)
if end_msg is None:
end_msg = self._fill_gaps_neib_time(time, layer_pos)
if end_msg is None:
end_msg = self._fill_gaps_all_times(time, layer_pos)
return end_msg
def _gapfill(self):
self._do_time_data()
self.time_data_gaps = {}
self.tmwm_data = _TMWMData(self.time_order, self.time_data, self.time_win_size,
cpu_max_workers=self.cpu_max_workers, engine=self.engine, gpu_tile_size=self.gpu_tile_size)
self.tmwm_data.run()
layer_args = []
for time in self.time_order:
nrows, ncols, n_layers = self.time_data[time].shape
self.time_data_gaps[time] = np.zeros((nrows, ncols, n_layers), dtype='int8')
for layer_pos in range(0, n_layers):
layer_args.append((time, layer_pos))
with warnings.catch_warnings():
warnings.filterwarnings('ignore', 'Mean of empty slice')
for end_msg in parallel.job(self.fill_image, iter(layer_args), n_jobs=self.cpu_max_workers, joblib_args={'require': 'sharedmem'}):
end_msg = True
gapfilled_data = []
gapfilled_data_flag = []
for i in range(0, self.n_years):
for time in self.time_order:
time_len = self.time_data[time].shape[2]
if (time_len <= self.n_years):
gapfilled_data.append(self.time_data[time][:,:,i])
gapfilled_data_flag.append(self.time_data_gaps[time][:,:,i])
gapfilled_data = np.stack(gapfilled_data, axis=2)
gapfilled_data_flag = np.stack(gapfilled_data_flag, axis=2)
return gapfilled_data, gapfilled_data_flag
[docs] class TLI(ImageGapfill):
"""
Temporal Linear Interpolation able to gapfill the missing pixels using a linear regression [1]
over the time using all valid pixels.
:param fn_files: Raster file paths to be read and gapfilled.
:param data: 3D array where the last dimension is the time.
:param outlier_remover: Strategy to remove outliers.
:param std_win: Temporal window size used to calculate a local median and std.
:param std_env: Number of std used to define a local envelope around the median.
Values outside of this envelope are removed.
:param perc_env: A list containing the lower and upper percentiles used to defined a global
envelope for the time series. Values outside of this envelope are removed.
:param n_jobs_io: Number of parallel jobs to read/write raster files.
:param verbose: Use ``True`` to print the progress of the gapfilled.
Examples
========
>>> from eumap import gapfiller
>>>
>>> tli = gapfiller.TLI(fn_files=fn_rasters)
>>> data_tli = tli.run()
>>>
>>> fn_rasters_tli = tli.save_rasters('./gapfilled_tli')
References
==========
[1] `Scikit-learn linear regression <https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LinearRegression.html>`_
"""
def __init__(self,
fn_files:List = None ,
data:np.array = None,
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose = True
):
super().__init__(fn_files=fn_files, data=data,
outlier_remover=outlier_remover, std_win=std_win,
std_env=std_env, perc_env=perc_env, n_jobs_io=n_jobs_io,
verbose=verbose)
def _temporal_linear_inter(self, data):
y = data.reshape(-1).copy()
y_nan = np.isnan(y)
y_size = data.shape[0]
y_valid = y[~y_nan]
n_gaps = np.sum(y_nan.astype('int'))
data_flag = np.zeros(y.shape)
if n_gaps > 0 and y_valid.shape[0] > 3:
X = np.array(range(0, y_size))
model = LinearRegression()
model.fit(X[~y_nan].reshape(-1, 1), y_valid)
y_pred = model.predict(X.reshape(-1, 1))
rmse = mean_squared_error(y[~y_nan], y_pred[~y_nan], squared=False)
y[y_nan] = y_pred[y_nan]
data_flag[y_nan] = rmse
return np.stack([ y.reshape(data.shape), data_flag.reshape(data.shape) ], axis=1)
else:
return np.stack([ data, data_flag ], axis=1)
def _gapfill(self):
result = parallel.apply_along_axis(self._temporal_linear_inter, 2, self.data)
return (result[:,:,:,0], result[:,:,:,1])
[docs] class SSA(ImageGapfill):
"""
Approach that uses a Singular Spectral Analysis (SSA [1]) to gapfill the missing
values and smooth all the raster data. The missing values are first gapfilled using
a long-term median strategy derived over values from other days/months/seasons. Later
the SSA is uses to decompose each time series in multiple components (``ngroups``),
considering only part of them to reconstruct the output time series.
(``reconstruct_ngroups``).
:param fn_files: Raster file paths to be read and gapfilled.
:param data: 3D array where the last dimension is the time.
:param season_size: Season size of a year used to calculate
the long-term median (for monthly time series it is equal ``12``).
:param max_gap_pct: Max percentage allowed to run the approach. For pixels
where this condition is satisfied the result is ``np.nan`` for all dates.
:param ltm_resolution: Number of years used to calculate the long-term median.
:param window_size: Size of the sliding window (i.e. the size of each word).
If float, it represents the percentage of the size of each time series and must be between 0 and 1.
The window size will be computed as ``max(2, ceil(window_size * n_timestamps))`` [1].
:param ngroups: Number of components used to decompose the time series [1].
:param reconstruct_ngroups: Number of components used to reconstruct the time series.
:param outlier_remover: Strategy to remove outliers.
:param std_win: Temporal window size used to calculate a local median and std.
:param std_env: Number of std used to define a local envelope around the median.
Values outside of this envelope are removed.
:param perc_env: A list containing the lower and upper percentiles used to defined a global
envelope for the time series. Values outside of this envelope are removed.
:param n_jobs_io: Number of parallel jobs to read/write raster files.
:param verbose: Use ``True`` to print the progress of the gapfilled.
Examples
========
>>> from eumap import gapfiller
>>>
>>> # For a 4-season time series
>>> ssa = gapfiller.SSA(fn_files=fn_rasters, season_size=4)
>>> data_ssa = ssa.run()
>>>
>>> fn_rasters_ssa = ssa.save_rasters('./gapfilled_ssa', dtype='uint8', save_flag=False)
References
==========
[1] `Pyts SingularSpectrumAnalysis <https://pyts.readthedocs.io/en/stable/generated/pyts.decomposition.SingularSpectrumAnalysis.html>`_
"""
def __init__(self,
fn_files:List = None ,
data:np.array = None,
season_size:int = 4,
max_gap_pct:int = 0.8,
ltm_resolution:int = 5,
window_size:int = 4,
ngroups:int = 4,
reconstruct_ngroups:int = 2,
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose = True
):
super().__init__(fn_files=fn_files, data=data,
outlier_remover=outlier_remover, std_win=std_win,
std_env=std_env, perc_env=perc_env, n_jobs_io=n_jobs_io,
verbose=verbose)
self.season_size = season_size
self.max_gap_pct = max_gap_pct
self.ltm_resolution = ltm_resolution
self.window_size = window_size
self.ngroups = ngroups
self.reconstruct_ngroups = reconstruct_ngroups
def gapfill_ltm(self, ts_data, season_size=12, agg_year=5):
ts_size = ts_data.shape[1]
agg_size = (season_size * agg_year)
i_list = [ (i0, (ts_size if (i0 + agg_size) > ts_size else (i0 + agg_size))) for i0 in range(0,ts_size,agg_size) ]
arr_ltm = []
for i0, i1 in i_list:
ts_year = ts_data[0, i0:i1].reshape(-1, season_size)
if self._perc_gaps(ts_year) == 1.0:
ts_ltm = np.empty((i1 - i0))
ts_ltm[:] = np.nan
else:
ts_ltm = bc.nanmean(ts_year, axis=0)
repetions = int((i1 - i0) / season_size)
ts_ltm = np.tile(ts_ltm, repetions)
arr_ltm.append(ts_ltm)
ts_ltm = np.concatenate(arr_ltm).reshape(ts_data.shape)
return ts_ltm
def _all_nan(self, data):
return np.all(np.isnan(data))
def _perc_gaps(self, data):
return np.sum(np.isnan(data).astype('int')) / data.flatten().shape[0]
def _ssa_reconstruction(self, data):
ts_data = data.reshape(1,-1).astype('Float32')
ts_flag = np.zeros(ts_data.shape)
if self._all_nan(ts_data):
return np.stack([ ts_data.reshape(data.shape),
ts_flag.reshape(data.shape) ], axis=1)
na_mask_initial = np.isnan(ts_data)
ts_gapfilled = ts_data.copy()
n_years = int(ts_gapfilled.shape[1] / self.season_size)
if self._perc_gaps(ts_gapfilled.flatten()) <= self.max_gap_pct:
for year in range(self.ltm_resolution, n_years, self.ltm_resolution):
ts_gapfilled_y = self.gapfill_ltm(ts_data, season_size=self.season_size, agg_year=year)
gapfill_mask = np.logical_and(np.isnan(ts_gapfilled),~np.isnan(ts_gapfilled_y))
ts_gapfilled[gapfill_mask] = ts_gapfilled_y[gapfill_mask]
gapfill_mask = np.isnan(ts_gapfilled)
if self._perc_gaps(ts_gapfilled) == 0:
break
if self._perc_gaps(ts_gapfilled.flatten()) < 1.0:
gapfill_mask = np.isnan(ts_gapfilled)
ts_mean = bc.nanmean(ts_gapfilled)
ts_gapfilled[gapfill_mask] = ts_mean
ssa = SingularSpectrumAnalysis(window_size=self.window_size, groups=self.ngroups)
ts_components = ssa.fit_transform(ts_gapfilled)
ts_reconstructed = np.sum(ts_components[0:self.reconstruct_ngroups,:], axis=0)
ts_flag[na_mask_initial] = 1
return np.stack([ts_reconstructed.reshape(data.shape), ts_flag.reshape(data.shape)], axis=1)
def _gapfill(self):
result = parallel.apply_along_axis(self._ssa_reconstruction, 2, self.data)
return (result[:,:,:,0], result[:,:,:,1])
[docs] class InPainting(ImageGapfill):
"""
Approach that uses a inpating technique [1] to gapfill raster data
using neighborhood values.
:param fn_files: Raster file paths to be read and gapfilled.
:param data: 3D array where the last dimension is the time.
:param space_win: Radius of a circular neighborhood of each point inpainted that is considered by the algorithm.
:param data_mask: 2D array indicating a valid areas, equal 1, where in case of gaps should be filled.
:param mode: Inpainting method that could be cv::INPAINT_NS or cv::INPAINT_TELEA [1]
:param outlier_remover: Strategy to remove outliers.
:param std_win: Temporal window size used to calculate a local median and std.
:param std_env: Number of std used to define a local envelope around the median.
Values outside of this envelope are removed.
:param perc_env: A list containing the lower and upper percentiles used to defined a global
envelope for the time series. Values outside of this envelope are removed.
:param n_jobs_io: Number of parallel jobs to read/write raster files.
:param verbose: Use ``True`` to print the progress of the gapfilled.
Examples
========
>>> from eumap import gapfiller
>>>
>>> # Considerer land_mask as 2D numpy array where 1 indicates land
>>> inPainting = gapfiller.InPainting(fn_files=fn_rasters, space_win = 10, data_mask=land_mask)
>>> data_inp = inPainting.run()
>>>
>>> fn_rasters_inp = inPainting.save_rasters('./gapfilled_inp')
References
==========
[1] `OpenCV Tutorial - Image Inpainting <https://docs.opencv.org/4.5.2/df/d3d/tutorial_py_inpainting.html>`_
"""
def __init__(self,
fn_files:List = None ,
data:np.array = None,
space_win = 10,
data_mask = None,
mode = cv.INPAINT_TELEA,
outlier_remover:OutlierRemover = None,
std_win:int = 3,
std_env:int = 2,
perc_env:list = [2,98],
n_jobs_io = 4,
verbose = True
):
super().__init__(fn_files=fn_files, data=data,
outlier_remover=outlier_remover, std_win=std_win,
std_env=std_env, perc_env=perc_env, n_jobs_io=n_jobs_io,
verbose=verbose)
self.data_mask = data_mask
self.space_win = space_win
self.mode = mode
def _inpaint(self, data, i=0):
# necessary for a proper inpaint execution
data_copy = np.copy(data)
with warnings.catch_warnings():
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
initial_value = np.nanmedian(data_copy)
na_data_mask = np.isnan(data_copy)
data_copy[na_data_mask] = initial_value
data_gapfilled = cv.inpaint(data_copy.astype('float32'), na_data_mask.astype('uint8'), self.space_win, self.mode)
return (data_gapfilled, i)
def _gapfill(self):
data = np.copy(self.data)
data_flag = np.zeros(data.shape)
if self.data_mask is None:
self.data_mask = np.ones(data.shape[0:2])
max_workers = multiprocessing.cpu_count()
n_times = data.shape[2]
args = []
for i in range(0, n_times):
if self._n_gaps(data[:,:,i]) > 0:
args.append((data[:,:,i], i))
result_set = {}
for band_data, i in parallel.job(self._inpaint, args, n_jobs=max_workers):
result_set[f'{i}'] = band_data
for i in range(0, n_times):
key = f'{i}'
if (key in result_set):
band_data_gapfilled = result_set[key]
gapfill_mask = np.logical_and(~np.isnan(band_data_gapfilled), np.isnan(data[:,:,i]))
gapfill_mask = np.logical_and(gapfill_mask, (self.data_mask == 1))
data[:,:,i][gapfill_mask] = band_data_gapfilled[gapfill_mask]
data_flag[:,:,i][gapfill_mask] = 1
return data, data_flag
[docs] def time_first_space_later(
fn_files:List = None,
data:np.array = None,
time_strategy:ImageGapfill = TMWM,
time_args:set = {},
space_strategy:ImageGapfill = InPainting,
space_args:set = {},
space_flag_val = 100
):
"""
Helper function to gapfill all the missing pixels using
first a temporal strategy (``TMWM``, ``TLI``, ``SSA``) and later
a spatial strategy (``InPainting``).
:param fn_files: Raster file paths to be read and gapfilled.
:param data: 3D array where the last dimension is the time.
:param time_strategy: One of the implemented temporal gapfilling approaches.
:param time_args: A ``set`` of parameters for the temporal gapfilling strategy
:param space_strategy: One of the implemented spatial gapfilling approaches.
:param space_args: A ``set`` of parameters for the spatial gapfilling strategy.
:param space_flag_val: The flag value used to indicate which pixels were gapfilled
by the spatial gapfilling strategy.
Examples
========
>>> from eumap import gapfiller
>>>
>>> # For a 4-season time series
>>> tfsl = gapfiller.time_first_space_later(
>>> fn_files = fn_rasters,
>>> time_strategy = gapfiller.TMWM,
>>> time_args = { 'season_size': 4 },
>>> space_strategy = gapfiller.InPainting,
>>> space_args = { 'space_win': 10 }
>>> )
>>>
>>> fn_rasters_tfsl = tfsl.save_rasters('./gapfilled_tmwm_inpaint', dtype='uint8', fn_files=fn_rasters)
"""
time_args['fn_files'] = fn_files
time_args['data'] = data
time = time_strategy(**time_args)
time_gapfilled = time.run()
time_gapfilled_pct = time._perc_gapfilled(time.gapfilled_data)
if time_gapfilled_pct < 1.0:
space_args['data'] = time_gapfilled
space = space_strategy(**space_args)
space.run()
space_mask = (space.gapfilled_data_flag == 1)
time.gapfilled_data_flag[space_mask] = space_flag_val
space.gapfilled_data_flag = time.gapfilled_data_flag
return space
else:
return time
except ImportError as e:
from .misc import _warn_deps
_warn_deps(e, 'gapfiller')