Source code for eumap.parallel.utils

"""
Parallelization helpers based in thread/process pools and joblib
"""
import numpy
import multiprocessing
from typing import Callable, Iterator, List,  Union
from concurrent.futures import as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor

import warnings
from pathlib import Path
import geopandas as gpd
import numpy as np
import multiprocessing
from osgeo import osr
import math
import rasterio
from rasterio.mask import mask
from shapely.geometry import Polygon
from rasterio.windows import Window, from_bounds
import os.path
import psutil
import time
import gc

from ..misc import ttprint
from .. import datasets

CPU_COUNT = multiprocessing.cpu_count()
"""
Number of CPU cores available.
"""

def _mem_usage():
  mem = psutil.virtual_memory()
  return (mem.used / mem.total)

def _run_task(i, task, mem_usage_limit, mem_check_interval, mem_check, verbose, *args):
  while (_mem_usage() > mem_usage_limit and mem_check):
    if verbose:
      ttprint(f'Memory usage in {_mem_usage():.2f}%, stopping workers for {task.__name__}')
    time.sleep(mem_check_interval)
    gc.collect()
  
  return task(*args)

[docs]def ThreadGeneratorLazy( worker:Callable, args:Iterator[tuple], max_workers:int = CPU_COUNT, chunk:int = CPU_COUNT*2, fixed_args:tuple = () ): """ Execute a function in parallel using a ``ThreadPoolExecutor`` [1]. :param worker: Function to execute in parallel. :param args: Argument iterator where each element is send job of the pool. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param chunk: Number of chunks to split the parallelization jobs. :param fixed_args: Constant arguments added in ``args`` in each execution of the ``worker`` function. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from eumap.parallel import ThreadGeneratorLazy >>> >>> def worker(i, msg): >>> print(f'{i}: {msg}') >>> return f'Worker {i} finished' >>> >>> args = iter([ (i,) for i in range(0,5)]) >>> fixed_args = ("I'm running in parallel", ) >>> >>> for result in ThreadGeneratorLazy(worker, args, fixed_args=fixed_args): >>> print(result) References ========== [1] `Python ThreadPoolExecutor class <https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor>`_ """ import concurrent.futures from itertools import islice with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: group = islice(args, chunk) futures = {executor.submit(worker, *arg + fixed_args) for arg in group} while futures: done, futures = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: yield future.result() group = islice(args, chunk) for arg in group: futures.add(executor.submit(worker,*arg + fixed_args))
[docs]def ProcessGeneratorLazy( worker:Callable, args:Iterator[tuple], max_workers:int = CPU_COUNT, chunk:int = CPU_COUNT*2, fixed_args:tuple = () ): """ Execute a function in parallel using a ``ProcessPoolExecutor`` [1]. :param worker: Function to execute in parallel. :param args: to separate job of the pool. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param chunk: Number of chunks to split the parallelization jobs. :param fixed_args: Constant arguments added in ``args`` in each execution of the ``worker`` function. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from eumap.parallel import ProcessGeneratorLazy >>> >>> def worker(i, msg): >>> print(f'{i}: {msg}') >>> return f'Worker {i} finished' >>> >>> args = iter([ (i,) for i in range(0,5)]) >>> fixed_args = ("I'm running in parallel", ) >>> >>> for result in ProcessGeneratorLazy(worker, args, fixed_args=fixed_args): >>> print(result) References ========== [1] `Python ProcessPoolExecutor class <https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor>`_ """ import concurrent.futures from itertools import islice with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: group = islice(args, chunk) futures = {executor.submit(worker, *arg + fixed_args) for arg in group} while futures: done, futures = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: yield future.result() group = islice(args, chunk) for arg in group: futures.add(executor.submit(worker, *arg + fixed_args))
[docs]def job( worker:Callable, worker_args:Iterator[tuple], n_jobs:int = -1, joblib_args:set = {} ): """ Execute a function in parallel using joblib [1]. :param worker: Function to execute in parallel. :param worker_args: Argument iterator where each element is send to separate job. :param joblib_args: Number of CPU cores to use in the parallelization. By default all cores are used. :param joblib_args: Joblib argumets to send to ``Parallel class`` [1]. :returns: A generator with the return of all workers :rtype: Generator Examples ======== >>> from eumap import parallel >>> >>> def worker(i, msg): >>> print(f'{i}: {msg}') >>> return f'Worker {i} finished' >>> >>> msg = ("I'm running in parallel", ) >>> args = iter([ (i,msg) for i in range(0,5)]) >>> >>> for result in parallel.job(worker, args, n_jobs=-1, joblib_args={'backend': 'threading'}): >>> print(result) References ========== [1] `joblib.Parallel class <https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html#joblib.Parallel>`_ """ from joblib import Parallel, delayed joblib_args['n_jobs'] = n_jobs for worker_result in Parallel(**joblib_args)(delayed(worker)(*args) for args in worker_args): yield worker_result
[docs]def apply_along_axis( worker:Callable, axis, arr:numpy.array, *args:any, **kwargs:any ): """ Execute a function through a ``numpy.array`` axis in parallel [1]. It uses joblib and ``backend=loky``, so avoid to send shared memory objects as arguments. :param worker: Function to execute in parallel. It needs to have at least one argument (``numpy.array``). :param axis: Axis used to execute the worker. :param arr: The input array. :param args: Additional arguments to the worker. :param kwargs: Additional named arguments to the worker. :returns: The output array with one dimension less than the input array. :rtype: numpy.array Examples ======== >>> from eumap import parallel >>> >>> def fn(arr, const): >>> return np.sum(arr) + const >>> >>> const = 1 >>> arr = np.ones((100,100,100)) >>> >>> out = parallel.apply_along_axis(fn, 0, arr, const) >>> print(arr.shape, out.shape) References ========== [1] `Best answer from Eric O Lebigot <https://stackoverflow.com/a/45555516>`_ """ import numpy as np def run(worker, axis, arr, args, kwargs): return np.apply_along_axis(worker, axis, arr, *args, **kwargs) """ Like numpy.apply_along_axis(), but takes advantage of multiple cores. """ # Effective axis where apply_along_axis() will be applied by each # worker (any non-zero axis number would work, so as to allow the use # of `np.array_split()`, which is only done on axis 0): effective_axis = 1 if axis == 0 else axis if effective_axis != axis: arr = arr.swapaxes(axis, effective_axis) # Chunks for the mapping (only a few chunks): chunks = [(worker, effective_axis, sub_arr, args, kwargs) for sub_arr in np.array_split(arr, CPU_COUNT)] result = [] for r in job(run, chunks): result.append(r) return np.concatenate(result)
[docs]class TilingProcessing(): """ Execute a processing function in parallel considering a tiling system and a base raster. It creates a rasterio ``window`` object for each tile according to the pixel size of the specified base. :param tiling_system_fn: Vector file path with the tiles to read. :param base_raster_fn: Raster file path used the retrieve the ``affine transformation`` for ``windows``. :param verbose: Use ``True`` to print information about read tiles and the base raster. """ def __init__(self, tiling_system_fn = 'http://s3.eu-central-1.wasabisys.com/eumap/tiling_system_30km.gpkg', base_raster_fn = 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_eumap_epsg3035_v1.0.tif', verbose:bool = False, epsg_checking:bool = True ): from pyproj import CRS self.tiles = gpd.read_file(tiling_system_fn) self.num_tiles = self.tiles.shape[0] self.base_raster = rasterio.open(base_raster_fn) tile_epsg = CRS(self.tiles.crs.to_wkt()).to_epsg() raster_epsg = CRS(self.base_raster.crs.to_wkt()).to_epsg() if epsg_checking and tile_epsg != raster_epsg: raise Exception( 'Different SpatialReference' + f'\n tiling_system_fn:\n{self.tiles.crs.to_wkt()}'+ f'\n base_raster_fn:\n{self.base_raster.crs.to_wkt()}' ) if verbose: pixel_size = self.base_raster.transform[0] ttprint(f'Pixel size equal {pixel_size} in {Path(base_raster_fn).name}') ttprint(f'{self.num_tiles} tiles available in {Path(tiling_system_fn).name}') ttprint(f'Using EPSG:{raster_epsg}') def _tile_window(self, idx): tile = self.tiles.iloc[idx] left, bottom, right, top = tile.geometry.bounds return tile, from_bounds(left, bottom, right, top, self.base_raster.transform)
[docs] def process_one(self, idx:int, func:Callable, *args:any ): """ Process a single tile using the specified function args. :param idx: The tile id to process. This idx is generated for all the tiles in a sequence starting from ``0``. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. Examples ======== >>> from eumap.parallel import TilingProcessing >>> from eumap.raster import read_rasters >>> >>> def run(idx, tile, window, raster_files): >>> data, _ = read_rasters(raster_files=raster_files, spatial_win=window, verbose=True) >>> print(f'Tile {idx}: data read {data.shape}') >>> >>> raster_files = [ >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_eumap_epsg3035_v1.0.tif', # winter >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201906_eumap_epsg3035_v1.0.tif', # spring >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201909_eumap_epsg3035_v1.0.tif', # summer >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201912_eumap_epsg3035_v1.0.tif' # fall >>> ] >>> >>> tiling= TilingProcessing(verbose=True) >>> tiling.process_one(0, run, raster_files) """ tile, window = self._tile_window(idx) return func(idx, tile, window, *args)
[docs] def process_multiple(self, idx_list:List[int], func:Callable, *args:any, max_workers:int = CPU_COUNT, use_threads:bool = True, progress_bar:bool = False ): """ Process in parallel a list of tile using the specified function args. :param idx: The tile ids to process. This idx is generated for all the tiles in a sequence starting from ``0``. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param use_threads: If ``True`` the parallel processing uses ``ThreadGeneratorLazy``, otherwise it uses ProcessGeneratorLazy. :param progress_bar: If ``True`` the parallel processing uses ``pqdm`` [1] presenting a progress bar and ignoring the ``use_threads``. Examples ======== >>> from eumap.parallel import TilingProcessing >>> from eumap.raster import read_rasters >>> >>> def run(idx, tile, window, raster_files): >>> data, _ = read_rasters(raster_files=raster_files, spatial_win=window, verbose=True) >>> print(f'Tile {idx}: data read {data.shape}') >>> >>> raster_files = [ >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201903_eumap_epsg3035_v1.0.tif', # winter >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201906_eumap_epsg3035_v1.0.tif', # spring >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201909_eumap_epsg3035_v1.0.tif', # summer >>> 'http://s3.eu-central-1.wasabisys.com/eumap/lcv/lcv_ndvi_landsat.glad.ard_p50_30m_0..0cm_201912_eumap_epsg3035_v1.0.tif' # fall >>> ] >>> >>> tiling= TilingProcessing(verbose=True) >>> idx_list = [0,10,100] >>> result = tiling.process_multiple(idx_list, run, raster_files) References ========== [1] `Parallel TQDM <https://pqdm.readthedocs.io/en/latest/readme.html>`_ """ _args = [] for idx in idx_list: tile, window = self._tile_window(idx) _args.append((idx, tile, window, *args)) if progress_bar: try: from pqdm.processes import pqdm as ppqdm from pqdm.threads import pqdm as tpqdm pqdm = (tpqdm if use_threads else ppqdm) results = pqdm(iter(_args), func, n_jobs=max_workers, argument_type='args') except ImportError as e: from ..misc import _warn_deps _warn_deps(e, 'progress bar') progress_bar = False if not progress_bar: WorkerPool = (ThreadGeneratorLazy if use_threads else ProcessGeneratorLazy) results = [] for r in WorkerPool(func, iter(_args), max_workers=max_workers, chunk=max_workers*2): results.append(r) return results
[docs] def process_all(self, func:Callable, *args:any, max_workers:int = CPU_COUNT, use_threads:bool = True, progress_bar:bool = False ): """ Process in parallel all of tile using the specified function args. :param func: A function with at least the arguments ``idx, tile, window``. :param args: Additional arguments to send to the function. :param max_workers: Number of CPU cores to use in the parallelization. By default all cores are used. :param use_threads: If ``True`` the parallel processing uses ``ThreadGeneratorLazy``, otherwise it uses ProcessGeneratorLazy. :param progress_bar: If ``True`` the parallel processing uses ``pqdm`` [1] presenting a progress bar, ignoring the ``use_threads``. Examples ======== >>> from eumap.parallel import TilingProcessing >>> from eumap.raster import read_rasters >>> >>> def run(idx, tile, window, msg): >>> print(f'Tile {idx} => {msg}') >>> >>> tiling= TilingProcessing(verbose=True) >>> msg = "Let's crunch some data." >>> result = tiling.process_all(run) References ========== [1] `Parallel TQDM <https://pqdm.readthedocs.io/en/latest/readme.html>`_ """ idx_list = range(0, self.num_tiles) return self.process_multiple(idx_list, func, *args, max_workers=max_workers, \ use_threads=use_threads, progress_bar=progress_bar)
[docs] @staticmethod def generate_tiles( tile_size:int, extent:tuple, crs:str, raster_layer_fn:str = None, ): """ Generate a custom tiling system based on a regular grid. :param tile_size: Single value used to define the width and height of a individual tile. It assumes the same unit of ``crs`` (degree for geographic coordinate systems and meter for projected coordinate systems). Tiles outside of the image are clipped to fit in the informed extent. :param extent: Extent definition considering ``minx, miny, maxx, maxy`` according to the ``crs`` argument. :param crs: Coordinate reference system for the tile geometries. Can be anything accepted by pyproj.CRS.from_user_input(), such as an authority string (EPSG:4326) or a WKT/proj4 string. :param raster_layer_fn: If provided, for each tile the ``min``, ``max`` and ``mode`` values are calculated considering the raster pixels inside the tile. It assumes the same ``crs`` for the raster layer and tiles. :returns: Tiling system with follow columns: ``tile_id``, ``minx``, ``miny``, ``maxx``, ``maxy`` and ``geometry``. The additional columns ``raster_min``, ``raster_mode_value``, ``raster_mode_count`` and ``raster_max`` are returned when a raster layer is provided. :rtype: geopandas.GeoDataFrame Examples ======== >>> eumap_extent = (900000, 930010, 6540000, 5460010) >>> tiling_system = TilingProcessing.generate_tiles(30000, eumap_extent, 'epsg:3035') >>> tiling_system.to_file(tiling_system_fn, driver="GPKG") """ minx, miny, maxx, maxy = extent data = {'tile_id': [], 'minx':[], 'miny':[], 'maxx':[], 'maxy':[], 'geometry':[]} tile_id = 0 for x1 in np.arange(minx, maxx, tile_size): for y1 in np.arange(miny, maxy, tile_size): x2 = x1+tile_size if x2 > maxx: x2 = maxx y2 = y1+tile_size if y2 > maxy: y2 = maxy data['tile_id'].append(tile_id) data['minx'].append(x1) data['miny'].append(y1) data['maxx'].append(x2) data['maxy'].append(y2) data['geometry'].append(Polygon([ (x1,y1), (x2,y1), (x2,y2), (x1,y2) ])) tile_id += 1 tiles = gpd.GeoDataFrame(data).set_crs(crs, inplace=True) if raster_layer_fn is not None: def _raster_values(tile, raster_layer_fn): shapes = [ tile['geometry'] ] try: with rasterio.open(raster_layer_fn) as src: out_image, out_transform = mask(src, shapes, crop=True, filled=True) out_image = out_image.astype('float32') nodata_val = src.nodatavals[0] _values, _counts = np.unique(out_image, return_counts=True) values, counts = [], [] for v,c in zip(_values, _counts): if (v != nodata_val): values.append(v) counts.append(c) values = np.array(values) counts = np.array(counts) m = np.argmax(counts) tile['raster_min'] = np.min(values) tile['raster_mode_value'] = values[m] tile['raster_mode_count'] = counts[m] tile['raster_max'] = np.max(values) except: tile['raster_min'] = None tile['raster_mode_value'] = None tile['raster_mode_count'] = None tile['raster_max'] = None return tile args = [ (tiles.loc[i,:], raster_layer_fn) for i in range(0, len(tiles)) ] result = [] for t in job(_raster_values, args): result.append(t) tiles = gpd.GeoDataFrame(result).set_crs(crs, inplace=True) return tiles
[docs]class TaskSequencer(): """ Execute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data :param tasks: Task definition list, where each element can be: (1) a ``Callable`` function; (2) a tuple containing a ``Callable`` function and the number of workers for the task; or (3) a tuple containing a ``Callable`` function, the number of workers and an ``bool`` indication if the task would respect the ``mem_usage_limit``. The default number of workers is ``1``. :param mem_usage_limit: Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if the ``task_1`` is responsible for reading the data and ``task_2`` for processing it, the ``task_1`` definition can receive an ``bool`` indication to respect the ``mem_usage_limit``, allowing the ``task_2`` to process the data that has already been read and releasing memory for the next ``task_1`` reads. :param wait_timeout: Timeout argument used by ``concurrent.futures.wait``. :param verbose: Use ``True`` to print the communication and status of the tasks Examples ======== >>> from eumap.parallel import TaskSequencer >>> >>> output = TaskSequencer( >>> tasks=[ >>> task_1, >>> (task_2, 2) >>> ] Pipeline produced by this example code: >>> ---------- ---------- >>> input_data -> | task_1 | -> | task_2 | -> output_data >>> ---------- ---------- >>> | | >>> |-worker_1 |-worker_1 >>> |-worker_2 """ def __init__(self, tasks:Union[List[Callable], List[tuple]], mem_usage_limit:float = 0.75, wait_timeout:int = 5, verbose:bool = False ): self.wait_timeout = wait_timeout self.mem_usage_limit = mem_usage_limit self.verbose = verbose self.mem_check_interval = 10 self.tasks = [] self.pipeline = [] self.mem_checks = [] for task in tasks: pool_size = 1 mem_check = False if type(task) is tuple: if len(task) == 2: task, pool_size = task else: task, pool_size, mem_check = task self._verbose(f'Starting {pool_size} worker(s) for {task.__name__} (mem_check={mem_check})') self.tasks.append(task) self.pipeline.append(ProcessPoolExecutor(max_workers = pool_size)) self.mem_checks.append(mem_check) self.n_tasks = len(self.tasks) self.pipeline_futures = [ set() for i in range(0, self.n_tasks) ] def _verbose(self, *args, **kwargs): if self.verbose: ttprint(*args, **kwargs)
[docs] def run(self, input_data:List[tuple]): """ Run the task pipeline considering the ``input_data`` argument. :param input_data: Input data used to feed the first task. :returns: List of returned values produced by the last task and with the same size of the ``input_data`` argument. :rtype: List Examples ======== >>> from eumap.misc import ttprint >>> from eumap.parallel import TaskSequencer >>> import time >>> >>> def rnd_data(const, size): >>> data = np.random.rand(size, size, size) >>> time.sleep(2) >>> return (const, data) >>> >>> def max_value(const, data): >>> ttprint(f'Calculating the max value over {data.shape}') >>> time.sleep(8) >>> result = np.max(data + const) >>> return result >>> >>> taskSeq = TaskSequencer( >>> tasks=[ >>> rnd_data, >>> (max_value, 2) >>> ], >>> verbose=True >>> ) >>> >>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ]) >>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ]) """ for i_dta in input_data: self._verbose(f'Submission to {self.tasks[0].__name__}') self.pipeline_futures[0].add( self.pipeline[0].submit(_run_task, *( (0, self.tasks[0], self.mem_usage_limit, self.mem_check_interval, self.mem_checks[0], self.verbose, *i_dta)) ) ) keep_going = True while keep_going: keep_going_aux = [] for i in range(0, self.n_tasks - 1): done, self.pipeline_futures[i] = wait(self.pipeline_futures[i], return_when=FIRST_COMPLETED, timeout=self.wait_timeout) if len(done) > 0: self._verbose(f'{self.tasks[i].__name__} state: done={len(done)} waiting={len(self.pipeline_futures[i])}') for f in done: nex_i = i+1 self._verbose(f'Submission to {self.tasks[nex_i].__name__}') self.pipeline_futures[nex_i].add( self.pipeline[nex_i].submit( _run_task, *( nex_i, self.tasks[nex_i], self.mem_usage_limit, self.mem_check_interval, self.mem_checks[nex_i], self.verbose, *f.result())) ) keep_going_aux.append(len(self.pipeline_futures[i]) > 0) keep_going = any(keep_going_aux) self._verbose(f'Waiting {self.tasks[-1].__name__}') result = [ future.result() for future in as_completed(self.pipeline_futures[-1]) ] return result