'''
Parallel block-wise processing and result aggregation for large raster datasets
'''
try:
import pygeos as pg
import rasterio as rio
import rasterio.features as rfeatures
import numpy as np
import sys
from shapely import speedups
if speedups.available:
speedups.enable()
import shapely.geometry as g
from multiprocessing.pool import ThreadPool
import multiprocessing as mp
import threading
from datetime import datetime
from typing import Union, Tuple, Iterable, Callable, Iterator
def _read_block(
src: Union[rio.DatasetReader, Iterable[rio.DatasetReader]],
window: rio.windows.Window,
geometry: Union[dict, None]=None,
band: int=1,
):
if isinstance(src, rio.DatasetReader):
src = [src]
mask = src[0].read_masks(band, window=window).astype(bool)
for s in src[1:]:
mask = mask & s.read_masks(band, window=window).astype(bool)
if not mask.any():
return None
if geometry is not None:
gmask = rfeatures.rasterize(
[geometry],
out_shape=(window.height, window.width),
transform=src[0].window_transform(window),
fill=0,
).astype(bool)
if not gmask.any():
return None
mask = mask & gmask
if not mask.any():
return None
data = np.stack([
s.read(band, window=window)
for s in src
])
return data[:, mask], mask, window
def _id(x):
return x
class _RasterBlockFunction:
def __init__(self,
func: Callable,
# agg_func: Callable=_id, # should not be done here
return_data_only: bool=False,
):
self.func = func
# self.agg_func = agg_func # should not be done here
self.return_data_only = return_data_only
def __call__(self, args):
data, mask, window = args
result = self.func(*data)
# result = self.agg_func(result) # should not be done here
if self.return_data_only:
return result
return result, mask, window
[docs] class RasterBlockReader:
"""
Thread-parallel reader for large rasters.
If ``reference_file`` is not ``None``, builds an R-tree index [1] of the block geometries read from the ``reference_file`` on initialization. All rasters read with the initialized reader are assumed to have identical geotransforms and block structures to the reference.
:param reference_file: Path (URL) of the reference raster.
For full usage examples please refer to the block processing tutorial notebook [2].
Examples
========
>>> from eumap.parallel.blocks import RasterBlockReader
>>> from eumap.misc import ttprint
>>>
>>> fp = 'https://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_landcover.hcl_lucas.corine.rf_p_30m_0..0cm_2019_eumap_epsg3035_v0.1.tif'
>>>
>>> ttprint('initializing reader')
>>> reader = RasterBlockReader(fp)
>>> ttprint('reader initialized')
References
==========
[1] `pygeos STRTree <https://pygeos.readthedocs.io/en/latest/strtree.html>`_
[2] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
def __init__(self,
reference_file: str=None,
):
self.reference = None
if reference_file is not None:
self._build_rtree(reference_file)
def _open(self,
src_path,
):
src = rio.open(src_path)
if all((
src.crs == self.reference.crs,
src.transform == self.reference.transform,
src.width == self.reference.width,
src.height == self.reference.height,
)):
return src
return rio.vrt.WarpedVRT(
src,
resampling=rio.enums.Resampling.nearest,
crs=self.reference.crs,
transform=self.reference.transform,
width=self.reference.width,
height=self.reference.height,
)
def _build_rtree(self, reference_file):
self.reference = rio.open(reference_file)
self.block_windows = np.array([
tup[1]
for tup in self.reference.block_windows()
])
boxes = self._blocks2boxes(self.block_windows)
self.rtree = pg.strtree.STRtree(boxes)
__ = self.rtree.query(boxes[0])
def _get_block_indices(self,
geometry: dict,
):
_geom = pg.from_shapely(g.shape(geometry))
return self.rtree.query(_geom)
def _blocks2boxes(self, block_windows):
coords = np.array([
self.reference.window_bounds(bw)
for bw in block_windows
])
return pg.box(*coords.T)
[docs] def read_overlay(self,
src_path: Union[str, Iterable[str]],
geometry: dict,
band: int=1,
geometry_mask: bool=True,
max_workers: int=mp.cpu_count(),
optimize_threadcount: bool=True,
) -> Iterator[Tuple[np.ndarray, np.ndarray, rio.windows.Window]]:
"""
Thread-parallel reading of large rasters within a bounding geometry.
Only blocks that intersect with ``geometry`` are read. Returns a generator yielding ``(data, mask, window)`` tuples for each block, where ``data`` are the stacked pixel values of all rasters at ``mask==True``,
``mask`` is the reduced (via bitwise ``and``) block data mask for all rasters, and ``window`` is the ``rasterio.windows.Window`` [1] for the block within the transform of the ``reference_file``.
All rasters read with the initialized reader are assumed to have identical geotransforms and block structures to the ``reference_file`` used for initialization.
If the reader was initialized with ``reference_file==None``, the first file in ``src_path`` is used as the reference and the block R-tree is built before yielding data from the first block.
:param src_path: Path(s) (or URLs) of the raster file(s) to read.
:param geometry: The bounding geometry within which to read raster blocks, given as a dictionary (with the GeoJSON geometry schema).
:param band: Index of band to read from all rasters.
:param geometry_mask: Indicates wheather or not to use the geometry as a data mask. If ``False``, the block data will be returned in its entirety, regardless if some of it falls outside of the ``geometry``.
:param max_workers: Maximum number of worker threads to use, defaults to ``multiprocessing.cpu_count()``.
:param optimize_threadcount: Wheather or not to optimize number of workers. If ``True``, the number of worker threads will be iteratively increased until the average read time per block stops decreasing or ``max_workers`` is reached. If ``False``, ``max_workers`` will be used as the number of threads.
:returns: Generator yielding ``(data, mask, window)`` tuples for each block.
:rtype: Iterator[Tuple(np.ndarray, np.ndarray, rasterio.windows.Window)]
For full usage examples please refer to the block processing tutorial notebook [2].
Examples
========
>>> geom = {
>>> 'type': 'Polygon',
>>> 'coordinates': [[
>>> [4765389, 2441103],
>>> [4764441, 2439352],
>>> [4767369, 2438696],
>>> [4761659, 2441949],
>>> [4765389, 2441103],
>>> ]],
>>> }
>>> block_data_gen = reader.read_overlay(fp)
>>> data, mask, window = next(block_data_gen)
References
==========
[1] `Rasterio Window <https://rasterio.readthedocs.io/en/latest/api/rasterio.windows.html>`_
[2] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
if isinstance(src_path, str):
src_path = [src_path]
if self.reference is None:
self._build_rtree(src_path[0])
block_idx = self._get_block_indices(geometry)
sources = {}
def _read_worker(window: rio.windows.Window):
import threading
tname = threading.current_thread().name
if tname not in sources:
sources[tname] = [
self._open(sp)
for sp in src_path
]
return _read_block(
sources[tname],
window,
(geometry if geometry_mask else None),
)
n_workers = max_workers
try:
if optimize_threadcount:
n_workers = 2
t_block_best = np.inf
first_results = []
while block_idx.size > 0 and n_workers <= max_workers:
batch_idx, block_idx = block_idx[:n_workers], block_idx[n_workers:]
with ThreadPool(n_workers) as pool:
t_start = datetime.now()
first_results += pool.map(
_read_worker,
self.block_windows[batch_idx],
)
dt = datetime.now() - t_start
dt = (dt.seconds + dt.microseconds * 1e-6) / batch_idx.size
if dt >= t_block_best:
n_workers -= 1
break
t_block_best = dt
n_workers += 1
print(f'reader using {n_workers} threads')
for block_data in first_results:
if block_data is not None:
yield block_data
with ThreadPool(n_workers) as pool:
for block_data in pool.imap_unordered(
_read_worker,
self.block_windows[block_idx],
):
if block_data is not None:
yield block_data
except GeneratorExit:
pass
with ThreadPool(n_workers) as pool:
__ = pool.map(
lambda key: [s.close() for s in sources[key]],
sources,
)
sources = {}
[docs] class RasterBlockAggregator:
"""
Class for aggregating results of block wise raster processing into a single result.
:param reader: RasterBlockReader instance to use for reading rasters.
For full usage examples please refer to the block processing tutorial notebook [1].
Examples
========
>>> from eumap.parallel.blocks import RasterBlockReader, RasterBlockAggregator
>>>
>>> fp = 'https://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_landcover.hcl_lucas.corine.rf_p_30m_0..0cm_2019_eumap_epsg3035_v0.1.tif'
>>>
>>> reader = RasterBlockReader(fp)
>>> aggregator = RasterBlockAggregator(reader)
References
==========
[1] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
def __init__(self,
reader: RasterBlockReader=None,
):
self.reader = reader
[docs] def aggregate(self,
src_path: Union[str, Iterable[str]],
geometry: dict,
block_func: Callable,
agg_func: Callable=np.mean,
**kwargs,
):
"""
Aggregates results of block wise raster processing into a single result.
:param src_path: Path(s) (or URLs) of the raster file(s) to read. If aggregator is initialized with ``reader=None``, the first file in ``src_path`` will be used to initialize a new reader.
:param geometry: The bounding geometry within which to read raster blocks, given as a dictionary (with the GeoJSON geometry schema).
:param block_func: Callable to perform on the data for each block.
:param agg_func: Callable to produce an aggregation of block-wise results.
:param **kwargs: Additional keyword arguments passed to ``RasterBlockReader.read_overlay()``.
:returns: The result of ``agg_func`` called with block-wise ``block_func`` results as the argument.
For full usage examples please refer to the block processing tutorial notebook [1].
Examples
========
>>> geom = {
>>> 'type': 'Polygon',
>>> 'coordinates': [[
>>> [4765389, 2441103],
>>> [4764441, 2439352],
>>> [4767369, 2438696],
>>> [4761659, 2441949],
>>> [4765389, 2441103],
>>> ]],
>>> }
>>>
>>> def urban_fabric_area(lc):
>>> return (lc==1) * 9e-4 # spatial resolution is 30x30 m
>>>
>>> result = agg.aggregate(
>>> fp, geom,
>>> block_func=urban_fabric_area,
>>> agg_func=np.sum,
>>> )
References
==========
[1] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
if isinstance(src_path, str):
src_path = [src_path]
if self.reader is None:
self.reader = RasterBlockReader(src_path[0])
(*block_results,) = map(
_RasterBlockFunction(
block_func,
return_data_only=True,
# agg_func=agg_func, # should not be done here
),
self.reader.read_overlay(
src_path,
geometry,
**kwargs,
),
)
result = agg_func(block_results)
return result
[docs] class RasterBlockWriter:
"""
Class for writing results of block wise raster processing results into a new raster file.
:param reader: RasterBlockReader instance to use for reading rasters.
For full usage examples please refer to the block processing tutorial notebook [1].
Examples
========
>>> from eumap.parallel.blocks import RasterBlockReader, RasterBlockWriter
>>>
>>> fp = 'https://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_landcover.hcl_lucas.corine.rf_p_30m_0..0cm_2019_eumap_epsg3035_v0.1.tif'
>>>
>>> reader = RasterBlockReader(fp)
>>> writer = RasterBlockWriter(reader)
References
==========
[1] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
def __init__(self,
reader: RasterBlockReader=None,
):
self.reader = reader
[docs] def write(self,
src_path: Union[str, Iterable[str]],
dst_path: str,
geometry: dict,
block_func: Callable=_id,
geometry_mask: bool=True,
reader_kwargs: dict={},
**kwargs,
):
"""
Writes block wise calculation results to new raster file.
Performs ``block_func`` on all blocks of file(s) listed in ``src_path`` that intersect with ``geometry`` and writes the results to a new raster.
:param src_path: Path(s) (or URLs) of the raster file(s) to read. If aggregator is initialized with ``reader=None``, the first file in ``src_path`` will be used to initialize a new reader.
:param dst_path: Path to write the result raster to.
:param geometry: The bounding geometry within which to read raster blocks, given as a dictionary (with the GeoJSON geometry schema).
:param block_func: Callable to perform on the data for each block. Result must retain the shape of input data. Defaults to the identity function.
:param geometry_mask: Indicates wheather or not to use the geometry as a data mask. If ``False``, calculation will be performed on all of the block data, regardless if some of it falls outside of the ``geometry``.
:param reader_kwargs: Additional keyword arguments passed to ``RasterBlockReader.read_overlay()``.
:param **kwargs: Additional raster profile keyword arguments passed to the ``rasterio`` dataset writer [1].
For full usage examples please refer to the block processing tutorial notebook [2].
Examples
========
>>> geom = {
>>> 'type': 'Polygon',
>>> 'coordinates': [[
>>> [4765389, 2441103],
>>> [4764441, 2439352],
>>> [4767369, 2438696],
>>> [4761659, 2441949],
>>> [4765389, 2441103],
>>> ]],
>>> }
>>>
>>> def is_urban_fabric(lc):
>>> return lc == 1
>>>
>>> writer.write(fp, 'urban_fabric.tif', geom, is_urban_fabric, dtype='uint8', nodata=0)
References
==========
[1] `Writing datasets with Rasterio <https://rasterio.readthedocs.io/en/latest/quickstart.html#saving-raster-data>`_
[2] `Raster block processing tutorial <../notebooks/06_raster_block_processing.html>`_
"""
if isinstance(src_path, str):
src_path = [src_path]
if self.reader is None:
self.reader = RasterBlockReader(src_path[0])
profile = self.reader.reference.profile
block_indices = self.reader._get_block_indices(geometry)
out_window_geometry = self.reader._blocks2boxes(self.reader.block_windows[block_indices])
out_window_geometry = pg.set_operations.coverage_union_all(out_window_geometry)
out_bounds = pg.measurement.bounds(out_window_geometry)
out_window = rio.windows.from_bounds(
*out_bounds,
transform=profile['transform'],
)
out_transform = self.reader.reference.window_transform(out_window)
profile.update(
transform=out_transform,
width=out_window.width,
height=out_window.height,
)
profile.update(**kwargs)
with rio.open(
dst_path, 'w',
**profile,
) as dst:
for data, mask, window in map(
_RasterBlockFunction(block_func, return_data_only=False),
self.reader.read_overlay(
src_path,
geometry,
geometry_mask=geometry_mask,
**reader_kwargs,
)
):
window_bounds = self.reader.reference.window_bounds(window)
window = rio.windows.from_bounds(*window_bounds, transform=dst.transform)
out = np.full_like(mask, profile['nodata'], dtype=profile['dtype'])
out[mask] = data.astype(profile['dtype'])
dst.write(out, 1, window=window)
except ImportError as e:
from ..misc import _warn_deps
_warn_deps(e, 'blocks')