from typing import List, Union
from dateutil.relativedelta import relativedelta
import requests
import numpy as np
import bottleneck as bc
import joblib
from pathlib import Path
import gc
import os
import warnings
from eumap import parallel
from eumap.raster import read_auth_rasters, save_rasters
from eumap.misc import _warn_deps, _eval, nan_percentile, ttprint, find_files, GoogleSheet
[docs]class GLADLandsat():
"""
Automation to download and process Landsat ARD images provided GLAD [1,2].
:param username: Username to access the files [3].
:param password: Password to access the files [3].
:param parallel_download: Number of files to download in parallel.
:param filter_additional_qa: Use ``True`` to remove pixels flagged as additional
cloud (``11 to 17`` - table 3 in [2]).
:param verbose: Use ``True`` to print the progress of all steps.
References
==========
[1] `User Manual - GLAD Landsat ARD <https://glad.geog.umd.edu/ard/user-manual>`_
[2] `Remote Sensing paper (Potapov, et al, 2020) <https://doi.org/10.3390/rs12030426>`_
[3] `User Registration - GLAD Landsat ARD <https://glad.geog.umd.edu/ard/user-registration>`_
"""
def __init__(self,
username:str,
password:str,
parallel_download:int = 4,
filter_additional_qa:bool = True,
verbose:bool = True,
):
self.username = username
self.password = password
self.base_url = 'https://glad.umd.edu/dataset/landsat_v1.1'
self.parallel_download = parallel_download
self.verbose = verbose
self.filter_additional_qa = filter_additional_qa
self.bqa_idx = 7
self.max_spectral_val = 40000
super().__init__()
def _glad_id(self, year_interval_id):
year, interval_id = year_interval_id.split('-')
return (int(year) - 1980) * 23 + int(interval_id)
def _glad_urls(self, tile, start, end):
i0 = self._glad_id(start)
i1 = self._glad_id(end)
urls = []
lat = tile.split('_')[1]
for i in range(i0, (i1+1)):
urls.append(f'{self.base_url}/{lat}/{tile}/{i}.tif')
return urls
def _verbose(self, *args, **kwargs):
if self.verbose:
ttprint(*args, **kwargs)
[docs] def read(self,
tile:str,
start:str,
end:str,
clear_sky:bool = True,
min_clear_sky:float = 0.2
):
"""
Download and read multiple dates for a specific tile. The images are
read on-the-fly without save the download files in disk.
:param tile: GLAD tile id to be processed according to [1].
:param start: Start date format (``{YEAR}-{INTERVAL_ID}`` - ``2000-1``).
The ``INTERVAL_ID`` ranges from 1 to 23 according to [2].
:param end: End date format (``{YEAR}-{INTERVAL_ID}`` - ``2000-1``).
The ``INTERVAL_ID`` ranges from 1 to 23 according to [2].
:param clear_sky: Use ``True`` to keep only the clear sky pixels, which are
``1, 2, 5 and 6`` according to quality flag band (Table 3 in [3]). For
``filter_additional_qa=False`` the pixels ``11 to 17`` are also considered
clear sky.
:param min_clear_sky: Minimum percentage of clear sky pixels in the tile to
keep a specific data, otherwise remove it from the result.
:returns: The read data, the accessed URLs and the Path for a empty base raster.
:rtype: Tuple[Numpy.array, List[str], Path]
Examples
========
>>> from eumap.datasets.eo import GLADLandsat
>>>
>>> # Do the registration in
>>> # https://glad.umd.edu/ard/user-registration
>>> username = '<YOUR_USERNAME>'
>>> password = '<YOUR_PASSWORD>'
>>> glad_landsat = GLADLandsat(username, password, verbose=True)
>>> data, urls, base_raster = glad_landsat.read('092W_47N', '2020-6', '2020-10')
>>> print(f'Data shape: {data.shape}')
References
==========
[1] `File glad_ard_tiles.shp (GLAD Landsat ARD Tools V1.1) <https://glad.geog.umd.edu/ard/software-download>`_
[2] `File 16d_intervals.xlsx (GLAD Landsat ARD Tools V1.1) <https://glad.geog.umd.edu/ard/software-download>`_
[3] `Remote Sensing paper (Potapov, et al, 2020) <https://doi.org/10.3390/rs12030426>`_
"""
url_list = self._glad_urls(tile, start, end)
data, base_raster = read_auth_rasters(
raster_files = url_list,
username = self.username,
password = self.password,
verbose = self.verbose,
return_base_raster = True,
nodata = 0,
n_jobs = self.parallel_download
)
if (isinstance(data, np.ndarray)):
self._verbose(f'Read data {data.shape}.')
if clear_sky:
self._verbose(f'Removing cloud and cloud shadow pixels.')
clear_sky_mask, clear_sky_pct = self._clear_sky_mask(data)
data[~clear_sky_mask] = np.nan
clear_sky_idx = np.where(clear_sky_pct >= min_clear_sky)[0]
data = data[:,:,:,clear_sky_idx]
if len(clear_sky_idx) == 0:
url_list = []
return data, url_list, base_raster
def _clear_sky_mask(self, data):
qa_data = data[self.bqa_idx,:,:,:]
if(np.nanmax(qa_data) > 255):
qa_data /= 100
clear_sky1 = np.logical_and(qa_data >= 1, qa_data <= 2)
clear_sky2 = np.logical_and(qa_data >= 5, qa_data <= 6)
clear_sky_mask = np.logical_or(clear_sky1, clear_sky2)
if not self.filter_additional_qa:
clear_sky3 = np.logical_and(qa_data >= 11, qa_data <= 17)
clear_sky_mask = np.logical_or(clear_sky_mask, clear_sky3)
clear_sky_mask = np.broadcast_to(clear_sky_mask, shape=data.shape)
nrow, ncols = data.shape[1], data.shape[2]
clear_sky_pct = []
for i in range(0, data.shape[3]):
clear_sky_count = bc.nansum(clear_sky_mask[0,:,:,i].astype(np.int))
clear_sky_pct.append(clear_sky_count / (nrow * ncols))
return clear_sky_mask, np.array(clear_sky_pct)
def _save_percentile_agg(self, result, base_raster, tile, start, p, output_dir, unit8):
if not isinstance(output_dir, Path):
output_dir = Path(output_dir)
output_files = []
fn_raster_list = []
data = []
for i in range(0, result.shape[0]):
band_data = result[i,:,:,:]
dtype = 'uint16'
if unit8:
scale, conv = self.max_spectral_val, 255
valid_mask = ~np.isnan(band_data)
band_data[valid_mask] = ((band_data[valid_mask] / scale) * conv)
band_data[valid_mask] = np.floor(band_data[valid_mask])
dtype = 'uint8'
for paux in p:
p_dir = output_dir.joinpath(f'{start}').joinpath(f'B{i+1}_P{int(paux)}')
fn_raster_list.append(p_dir.joinpath(f'{tile}.tif'))
data.append(band_data)
data = np.concatenate(data, axis=2)
output_files += save_rasters(fn_base_raster=base_raster, data=data, fn_raster_list=fn_raster_list,
dtype=dtype, nodata=0, fit_in_dtype=True, n_jobs=self.parallel_download, verbose=self.verbose)
return output_files
def _calc_save_count(self, data, max_val, base_raster, tile, start, output_dir):
band_data = data[0,:,:,:]
band_data = band_data.copy()
band_valid_mask = np.logical_and(band_data >= 1.0, band_data <= max_val)
band_data[~band_valid_mask] = np.nan
band_count = bc.nansum(np.logical_not(np.isnan(band_data)).astype('uint8'), axis=2)
band_count = np.stack([band_count], axis=2)
if not isinstance(output_dir, Path):
output_dir = Path(output_dir)
fn_raster = output_dir.joinpath(f'{start}') \
.joinpath(f'B1_COUNT') \
.joinpath(f'{tile}.tif')
dtype = 'uint8'
save_rasters(fn_base_raster=base_raster, data=band_count, fn_raster_list=[ fn_raster ],
dtype=dtype, nodata=0, n_jobs=1, verbose=self.verbose)
return fn_raster
[docs] def percentile_agg(self,
tile:str,
start:str,
end:str,
p:List,
clear_sky:bool = True,
min_clear_sky:bool = 0.2,
n_jobs = 7,
output_dir:Path = None,
unit8:bool = True
):
"""
Download, read and aggregate multiple dates in different percentiles.
:param tile: GLAD tile id to be processed according to [1].
:param start: Start date format (``{YEAR}-{INTERVAL_ID}`` - ``2000-1``).
The ``INTERVAL_ID`` ranges from 1 to 23 according to [2].
:param end: End date format (``{YEAR}-{INTERVAL_ID}`` - ``2000-1``).
The ``INTERVAL_ID`` ranges from 1 to 23 according to [2].
:param p: A list with the percentiles values between 0 and 100.
:param clear_sky: Use ``True`` to keep only the clear sky pixels, which are
``1, 2, 5 and 6`` according to quality flag band (Table 3 in [3]). For
``filter_additional_qa=False`` the pixels ``11 to 17`` are also considered
clear sky.
:param min_clear_sky: Minimum percentage of clear sky pixels in the tile to
keep a specific data, otherwise remove it from the result.
:param n_jobs: Number of jobs to process the spectral bands in parallel. More
then ``7`` is meaningless and don't improve the performance.
:param output_dir: If provided save the result to this folder. By default is
``None`` and no files are saved to disk.
:param unit8: Use ``True`` to convert the read data to ``unit8``.
:returns: The read data, the Path for a empty base raster and the saved
files (only if ``output_dir=True``).
:rtype: Tuple[Numpy.array, List[str], Path]
Examples
========
>>> from eumap.datasets.eo import GLADLandsat
>>>
>>> # Do the registration here
>>> # https://glad.umd.edu/ard/user-registration
>>> username = '<YOUR_USERNAME>'
>>> password = '<YOUR_PASSWORD>'
>>> glad_landsat = GLADLandsat(username, password, verbose=True)
>>> data, base_raster, _ = glad_landsat.percentile_agg('092W_47N', '2020-6', '2020-10',
>>> p=[25,50,75], output_dir='./glad_landsat_ard_percentiles')
>>> print(f'Shape of data: {data.shape}')
References
==========
[1] `File glad_ard_tiles.shp (GLAD Landsat ARD Tools V1.1) <https://glad.geog.umd.edu/ard/software-download>`_
[2] `File 16d_intervals.xlsx (GLAD Landsat ARD Tools V1.1) <https://glad.geog.umd.edu/ard/software-download>`_
[3] `Remote Sensing paper (Potapov, et al, 2020) <https://doi.org/10.3390/rs12030426>`_
"""
def _run(band_data, p, i, max_val):
if (i == 6):
max_val = 65000
band_data = band_data.copy()
band_valid_mask = np.logical_and(band_data >= 1.0, band_data <= max_val)
band_data[~band_valid_mask] = np.nan
perc_data = nan_percentile(band_data.transpose(2,0,1), p)
perc_data = np.stack(perc_data, axis=0)
return (perc_data, i)
data, url_list, base_raster = self.read(tile, start, end, clear_sky, min_clear_sky)
if len(url_list) > 0:
self._verbose(f'Aggregating by {len(p)} percentiles.')
args = []
for i in range(0, self.bqa_idx):
args.append( (data[i,:,:,:], p, i, self.max_spectral_val) )
result = {}
for perc_data, i in parallel.job(_run, args, n_jobs=n_jobs):
result[i] = perc_data
result = [result[i] for i in range(0,len(args))]
if len(result) > 0:
result = np.stack(result, axis=3).transpose(3,1,2,0)
output_files = []
if output_dir is not None:
self._verbose(f'Saving the result in {output_dir}.')
output_files = self._save_percentile_agg(result, base_raster, tile, start, p, output_dir, unit8)
self._verbose(f'Counting clear_sky pixels')
output_file_count = self._calc_save_count(data, self.max_spectral_val, base_raster, tile, start, output_dir)
output_files.append(output_file_count)
del data
gc.collect()
self._verbose(f'Removing {base_raster}')
os.remove(base_raster)
return result, base_raster, output_files
else:
raise RuntimeError('All the images were read, but no clear_sky data was found.')
try:
import pystac
import rasterio
import mimetypes
import pandas as pd
import matplotlib.pyplot as plt
from minio import Minio
from PIL import Image
from itertools import chain
from datetime import datetime
from pyproj import Transformer
from matplotlib.colors import ListedColormap
from shapely.geometry import Polygon, mapping, shape
from pystac.extensions.item_assets import ItemAssetsExtension
[docs] class STACGenerator():
"""
Generator able to access a remote Google Spreadsheet [1] containing several
raster layer metadata (e.g. name, description, cloud-optimized GeoTIFF URL)
and produce multiple SpatioTemporal Asset Catalogs (STAC) instances in a
local folder and / or remote S3 bucket [2,3].
The COG files need to be publicly accessible to HTTP and compatible the Geo-harmonizer
file naming convention [4]. The thumbnails are produced for every COG
according to color scheme defined by columns ``thumb_cmap``, ``thumb_vmin``, ``thumb_vmax``.
:param gsheet: Object representation of a Google Spreadsheet containing the metadata.
:param url_date_format: Date format expected in the COG URL (``strftime``).
:param cog_level: COG overview level used to generate the thumbnail.
:param thumb_overwrite: Overwrite the thumbnail files if exists.
:param asset_id_delim: Field delimiter used to split the COG filename [4].
:param asset_id_fields: Fields retrieved from COG filename used to compose the asset id.
:param catalogs: Used to pass a dictionary (``catalog_id`` as key and
``pystac.catalog.Catalog`` as value) for update operation in pre-existing catalogs.
:param verbose: Use ``True`` to print the progress of all steps.
References
==========
[1] `ODSE Raster layer metadata example <https://docs.google.com/spreadsheets/d/10tAhEpZ7TYPD0UWhrI0LHcuIzGZNt5AgSjx2Bu-FciU>`_
[2] `ODSE STAC Catalog <https://s3.eu-central-1.wasabisys.com/stac/odse/catalog.json>`_
[3] `ODSE STAC Browser <http://stac.opendatascience.eu>`_
[4] `Geo-harmonizer file naming convention <https://gitlab.com/geoharmonizer_inea/spatial-layers>`_
"""
def __init__(self,
gsheet:GoogleSheet,
url_date_format = '%Y.%m.%d',
cog_level = 7,
thumb_overwrite = False,
asset_id_delim = '_',
asset_id_fields = [1,3,5],
catalogs = None,
verbose = False
):
self.cog_level = cog_level
self.thumb_overwrite = thumb_overwrite
self.gsheet = gsheet
self.url_date_format = url_date_format
self.verbose = verbose
self.asset_id_delim = asset_id_delim
self.asset_id_fields = asset_id_fields
coll_columns = gsheet.collections.columns
self.additional_url_cols = list(coll_columns[pd.Series(coll_columns).str.startswith('url')])
self.gdal_env = {
"GDAL_CACHEMAX": 1024_000_000, # 1 Gb in bytes
"GDAL_DISABLE_READDIR_ON_OPEN": False,
"VSI_CACHE": True,
"VSI_CACHE_SIZE": 1024_000_000,
"GDAL_HTTP_MAX_RETRY": 3,
"GDAL_HTTP_RETRY_DELAY": 60
}
self.fields = {
'collection': ['id', 'title', 'description', 'license','keywords'],
'provider': ['name', 'description', 'roles', 'url'],
'catalog': ['id', 'title', 'description'],
'common_metadata': ['constellation', 'platform', 'instruments', 'gsd'],
'internal': ['start_date', 'end_date', 'date_step', 'date_unit', 'date_style','catalog', 'providers', 'main_url']
}
self.fields['internal'] += self.additional_url_cols
self.fields_lookup = dict.fromkeys(
set(chain(*[self.fields[key] for key in self.fields.keys()]))
, True
)
self.providers = self._providers()
if catalogs is None:
self.catalogs = self._catalogs()
else:
self.catalogs = catalogs
self._populate()
def _verbose(self, *args, **kwargs):
if self.verbose:
ttprint(*args, **kwargs)
def _providers(self):
providers = {}
for i,p in self.gsheet.providers.iterrows():
providers[p['name']] = pystac.Provider(**self._kargs(p, 'provider'))
return providers;
def _catalogs(self):
catalogs = {}
for i,p in self.gsheet.catalogs.iterrows():
catalogs[p['id']] = pystac.Catalog(**self._kargs(p, 'catalog'))
return catalogs;
def _fetch_collection(self, key, i, row, bbox_footprint_results):
items = []
for start_date, end_date in self._gen_dates(**row):
main_url = self._parse_url(row['main_url'], start_date, end_date, row['date_unit'], row['date_style'])
additional_urls = []
for ac_url in self.additional_url_cols:
if row[ac_url]:
additional_urls.append(self._parse_url(row[ac_url], start_date, end_date, row['date_unit'], row['date_style']))
bbox, footprint = bbox_footprint_results[main_url]
items.append(self._new_item(row, start_date, end_date, main_url, bbox, footprint, additional_urls))
return (key, row, items)
def _populate(self):
self.new_collections = {}
groups = self.gsheet.collections.groupby('catalog')
args = []
for key in groups.groups.keys():
for i, row in groups.get_group(key).iterrows():
for start_date, end_date in self._gen_dates(**row):
main_url = self._parse_url(row['main_url'], start_date, end_date, row['date_unit'], row['date_style'])
args.append((main_url,))
bbox_footprint_results = {}
for url, bbox, footprint in parallel.job(self._bbox_and_footprint, args, n_jobs=-1, joblib_args={'backend': 'multiprocessing'}):
bbox_footprint_results[url] = (bbox, footprint)
collection_args = []
for key in groups.groups.keys():
for i, row in groups.get_group(key).iterrows():
collection_args.append((key, i, row, bbox_footprint_results))
for key, row, items in parallel.job(self._fetch_collection, collection_args, n_jobs=-1, joblib_args={'backend': 'multiprocessing'}):
collection = self._new_collection(row, items)
if collection is None:
self._verbose(f"Faile to create the collection {row['id']}")
else:
item = self.catalogs[key].get_child(collection.id)
if item is not None:
self.catalogs[key].remove_child(collection.id)
self.catalogs[key].add_child(collection)
if key not in self.new_collections:
self.new_collections[key] = []
self.new_collections[key].append(collection)
self._verbose(f"Creating collection {collection.id} with {len(items)}")
def _get_val(self, dicty, key):
if key in dicty:
return dicty[key]
else:
return None
def _is_data(self, asset):
for r in asset.roles:
if r == 'data':
return True
return False
def _generate_thumbs(self, output_dir='./stac', thumb_base_url=None):
for key, colls in self.new_collections.items():
args = []
catalog = self.catalogs[key]
for coll in colls:
for item in coll.get_all_items():
assets = [ a for a in item.assets.items() ]
collection = item.get_collection()
cmap = self._get_val(collection.extra_fields, 'thumb_cmap')
vmin = self._get_val(collection.extra_fields, 'thumb_vmin')
vmax = self._get_val(collection.extra_fields, 'thumb_vmax')
if ',' in cmap:
colors = cmap.split(',')
cmap = ListedColormap(colors)
for key, asset in assets:
if self._is_data(asset):
thumb_fn = Path(output_dir) \
.joinpath(catalog.id) \
.joinpath(item.collection_id) \
.joinpath(item.id) \
.joinpath(Path(asset.href).stem + '.png')
thumb_fn.parent.mkdir(parents=True, exist_ok=True)
if 'main' in asset.extra_fields:
args.append((asset.href, str(thumb_fn), item.id, True, cmap, vmin, vmax))
else:
args.append((asset.href, str(thumb_fn), item.id, False))
self._verbose(f"Generating {len(args)} thumbnails for catalog {catalog.id}")
for thumb_fn, item_id, is_thumb_url in parallel.job(self._thumbnail, args, n_jobs=-1):
item = catalog.get_item(item_id, True)
if thumb_fn is not None:
thumd_id = self._asset_id(thumb_fn, self.asset_id_delim, self.asset_id_fields) + '_preview'
roles = []
if is_thumb_url:
thumd_id = 'thumbnail'
roles = ['thumbnail']
if thumb_base_url is not None:
thumb_fn = str(thumb_fn).replace(str(output_dir), thumb_base_url)
item.add_asset(thumd_id, pystac.Asset(href=thumb_fn, media_type=pystac.MediaType.PNG, roles=roles))
def _thumbnail(self, url, thumb_fn, item_id, is_thumb_url=False, cmap = 'binary', vmin = None, vmax = None):
if not self.thumb_overwrite and Path(thumb_fn).exists():
#self._verbose(f'Skipping {thumb_fn}')
return (thumb_fn, item_id, is_thumb_url)
r = requests.head(url)
if not (r.status_code == 200):
return(None, item_id, is_thumb_url)
with rasterio.open(url) as src:
try:
oviews = src.overviews(1)
if len(oviews) == 0:
return(None, item_id, is_thumb_url)
cog_level = self.cog_level
if cog_level >= len(oviews):
cog_level = -1
oview = oviews[cog_level]
result = src.read(1, out_shape=(1, src.height // oview, src.width // oview)).astype('float32')
result[result==src.nodata] = np.nan
if vmin is None:
perc = np.nanpercentile(result,[8,92])
vmin, vmax = perc[0], perc[1]
fig, ax = plt.subplots(figsize=(1, 1))
ax.axis('off')
plt.imsave(thumb_fn, result, cmap=cmap, vmin=vmin, vmax=vmax)
basewidth = 675
img = Image.open(thumb_fn)
wpercent = (basewidth/float(img.size[0]))
hsize = int((float(img.size[1])*float(wpercent)))
img = img.resize((basewidth,hsize), Image.ANTIALIAS)
img.save(thumb_fn)
return (thumb_fn, item_id, is_thumb_url)
except:
return(None, item_id, is_thumb_url)
def _new_collection(self, row, items):
if len(items) > 0:
unioned_footprint = shape(items[0].geometry)
collection_bbox = list(unioned_footprint.bounds)
start_date = items[0].properties['start_datetime']
end_date = items[-1].properties['end_datetime']
collection_interval = sorted([
datetime.strptime(start_date,"%Y-%m-%d"),
datetime.strptime(end_date,"%Y-%m-%d")
])
collection = pystac.Collection(
extent=pystac.Extent(
spatial=pystac.SpatialExtent(bboxes=[collection_bbox]),
temporal=pystac.TemporalExtent(intervals=[collection_interval])
),
providers=[ self.providers[p] for p in row['providers'] ],
stac_extensions=['https://stac-extensions.github.io/item-assets/v1.0.0/schema.json'],
**self._kargs(row, 'collection', True)
)
#itemasset_ext = ItemAssetsExtension.ext(collection)
#print(itemasset_ext.item_assets)
collection.add_items(items)
return collection
else:
return None
def _asset_id(self, url, delim = None, id_fields = None):
if delim is None:
return Path(url).name
fields = Path(url).name.split(delim)
result = []
for f in id_fields:
if f < len(fields):
result.append(fields[f])
return delim.join(result)
def _new_item(self, row, start_date, end_date, main_url, bbox, footprint, additional_urls = []):
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
start_date_url_str = start_date.strftime(self.url_date_format)
end_date_url_str = end_date.strftime(self.url_date_format)
item_id = f'{row["id"]}_{start_date_url_str}..{end_date_url_str}'
item = pystac.Item(id=item_id,
geometry=footprint,
bbox=bbox,
datetime=start_date,
properties={'start_datetime': start_date_str, 'end_datetime': end_date_str},
stac_extensions=["https://stac-extensions.github.io/eo/v1.0.0/schema.json"])
item.common_metadata.gsd = row['gsd']
item.common_metadata.instruments = row['instruments']
if 'platform' in row and row['platform']:
item.common_metadata.platform = row['platform']
if 'constellation' in row:
item.common_metadata.constellation = row['constellation']
#eo_ext = EOExtension.ext(item)
#eo_ext.apply(bands=[
#Band.create(name='EVI', description='Enhanced vegetation index', common_name='evi')
#])
item.add_asset(self._asset_id(main_url, self.asset_id_delim, self.asset_id_fields), \
pystac.Asset(href=main_url, media_type=pystac.MediaType.GEOTIFF, roles=['data'], extra_fields={'main': True}))
for aurl in additional_urls:
item.add_asset(self._asset_id(aurl, self.asset_id_delim, self.asset_id_fields), \
pystac.Asset(href=aurl, media_type=pystac.MediaType.GEOTIFF, roles=['data']))
return item
def _gen_dates(self, start_date, end_date, date_unit, date_step, ignore_29feb, **kwargs):
result = []
if date_unit == 'static':
result.append((
start_date,
end_date
))
elif date_unit == 'custom_multiannual':
## Irregular/custom date iteration
for year_range in date_step.split(','):
year_range_arr = year_range.split('..')
start_year = year_range_arr[0]
end_year = year_range_arr[1]
result.append((
datetime.strptime(f'{start_year}.01.01', self.url_date_format),
datetime.strptime(f'{end_year}.12.31', self.url_date_format)
))
elif date_unit == 'custom_predefined':
## Irregular/custom date iteration
for dt_range in date_step.split(','):
dt_range_arr = dt_range.split('..')
start_year = dt_range_arr[0]
end_year = dt_range_arr[1]
result.append((
datetime.strptime(f'{start_year}', self.url_date_format),
datetime.strptime(f'{end_year}', self.url_date_format)
))
else:
dt1 = start_date
while(dt1 <= end_date):
## Regular date iteration
if date_unit == 'custom_intraannual':
## Regular yearly iteration and irregular/custom intraannual date iteration
for date_range in date_step.split(','):
date_step_arr = date_range.split('..')
start_dt = date_step_arr[0]
end_dt = date_step_arr[1]
year = int(dt1.strftime('%Y'))
y, y_m1, y_p1 = str(year), str((year - 1)), str((year + 1))
start_dt = start_dt.replace('{year}', y) \
.replace('{year_minus_1}', y_m1) \
.replace('{year_plus_1}', y_p1)
end_dt = end_dt.replace('{year}', y) \
.replace('{year_minus_1}', y_m1) \
.replace('{year_plus_1}', y_p1)
result.append((
datetime.strptime(start_dt, self.url_date_format),
datetime.strptime(end_dt, self.url_date_format)
))
dt1 = dt1 + relativedelta(years=+1)
else:
## Regular date iteration (yearly, monthly, daily, etc)
delta_args = {}
delta_args[date_unit] = int(date_step) # TODO: Threat the value "month"
dt1n = dt1 + relativedelta(**delta_args)
dt2 = dt1n + relativedelta(days=-1)
if (ignore_29feb.lower() == 'true' and dt2.strftime("%m") == '02' and dt2.strftime("%d") == '29'):
dt2 = dt2 + relativedelta(days=-1)
result.append((dt1, dt2))
dt1 = dt1n
return result
def _kargs(self, row, key, add_extra_fields=False):
_args = {}
for f in self.fields[key]:
if f in row:
_args[f] = row[f]
if add_extra_fields:
_args['extra_fields'] = {}
for ef in row.keys():
if ef not in self.fields_lookup and row[ef]:
_args['extra_fields'][ef] = row[ef]
return _args
def _parse_url(self, url, dt1, dt2, date_unit = 'months', date_style = 'interval'):
date_format = self.url_date_format
if (date_unit == 'years' or date_unit == 'custom_multiannual'):
date_format = '%Y'
if (date_style == 'start_date'):
dt = f'{dt1.strftime(date_format)}'
elif (date_style == 'end_date'):
dt = f'{dt2.strftime(date_format)}'
else:
dt = f'{dt1.strftime(date_format)}..{dt2.strftime(date_format)}'
item_id = str(Path(url).name) \
.replace('{dt}','') \
.replace('__', '_')
return _eval(url, locals())
def _bbox_and_footprint(self, raster_fn):
self._verbose(f'Accesing COG bounds ({raster_fn})')
r = requests.head(raster_fn)
if not (r.status_code == 200):
self._verbose(f'The file {raster_fn} not exists')
return(None, None, None)
with rasterio.Env(**self.gdal_env) as rio_env:
with rasterio.open(raster_fn) as ds:
transformer = Transformer.from_crs(ds.crs, "epsg:4326", always_xy=True)
bounds = ds.bounds
left_wgs84, bottom_wgs84 = transformer.transform(bounds.bottom, bounds.left)
right_wgs84, top_wgs84 = transformer.transform(bounds.top, bounds.right)
bbox = [left_wgs84, bottom_wgs84, right_wgs84, top_wgs84]
footprint = Polygon([
[left_wgs84, bottom_wgs84],
[left_wgs84, top_wgs84],
[right_wgs84, top_wgs84],
[right_wgs84, bottom_wgs84]
])
return (raster_fn, bbox, mapping(footprint))
[docs] def save_all(self,
output_dir:str = 'stac',
catalog_type = pystac.CatalogType.SELF_CONTAINED,
thumb_base_url = None
):
"""
Save the STAC instance to local folder.
:param output_dir: Destination folder.
:param catalog_type: Normalization strategy defined
by ``pystac.CatalogType``.
:param thumb_base_url: Base urls for the thumbnail
files. Useful in cases where the COG files are
hosted in a different location (S3 bucket) of
STAC files.
Examples
========
>>> from eumap.misc import GoogleSheet
>>> from eumap.datasets.eo import STACGenerator
>>>
>>> # Generate your key follow the instructions in https://docs.gspread.org/en/latest/oauth2.html
>>> key_file = '<GDRIVE_KEY>'
>>> # Public accessible Google Spreadsheet (Anyone on the internet with this link can view)
>>> url = 'https://docs.google.com/spreadsheets/d/10tAhEpZ7TYPD0UWhrI0LHcuIzGZNt5AgSjx2Bu-FciU'
>>>
>>> gsheet = GoogleSheet(key_file, url, verbose=True)
>>> stac_generator = STACGenerator(gsheet, asset_id_fields=[1,2,3,5], catalogs=catalogs, verbose=True)
>>> stac_generator.save_all(output_dir='stac_odse', thumb_base_url=f'https://s3.eu-central-1.wasabisys.com/stac')
"""
output_dir = Path(output_dir)
self._generate_thumbs(output_dir, thumb_base_url)
for key, catalog in self.catalogs.items():
catalog.normalize_and_save(
root_href=str(output_dir.joinpath(catalog.id)),
catalog_type=catalog_type
)
def _s3_fput_object(self, fpath, output_dir, client, s3_bucket_name, s3_prefix):
if(fpath.is_file()):
object_name = fpath.relative_to(output_dir.name)
if s3_prefix:
object_name = Path(s3_prefix).joinpath(object_name)
content_type,_ = mimetypes.guess_type(fpath)
client.fput_object(s3_bucket_name, str(object_name), str(fpath), content_type=content_type)
return True
else:
return False
[docs] def save_and_publish_all(self,
s3_host:str,
s3_access_key:str,
s3_access_secret:str,
s3_bucket_name:str,
s3_prefix:str = '',
output_dir:str = 'stac',
catalog_type = pystac.CatalogType.SELF_CONTAINED
):
"""
Save the STAC instance to local folder and upload all the files
to a s3 bucket.
:param s3_host: Hostname of a S3 service.
:param s3_access_key: Access key (aka user ID) of S3 service.
:param s3_access_secret: Secret key (aka user ID) of S3 service.
:param s3_bucket_name: Name of the bucket.
:param s3_prefix: Object name prefix (URL part) in the bucket.
:param output_dir: Destination folder.
:param catalog_type: Normalization strategy defined
by ``pystac.CatalogType``.
Examples
========
>>> s3_host = "<S3_HOST>"
>>> s3_access_key = "<s3_access_key>"
>>> s3_access_secret = "<s3_access_secret>"
>>> s3_bucket_name = 'stac'
>>>
>>> # Generate your key follow the instructions in https://docs.gspread.org/en/latest/oauth2.html
>>> key_file = '<GDRIVE_KEY>'
>>> # Public accessible Google Spreadsheet (Anyone on the internet with this link can view)
>>> url = 'https://docs.google.com/spreadsheets/d/10tAhEpZ7TYPD0UWhrI0LHcuIzGZNt5AgSjx2Bu-FciU'
>>>
>>> gsheet = GoogleSheet(key_file, url)
>>> stac_generator = STACGenerator(gsheet, verbose=True)
>>> stac_generator.save_and_publish_all(s3_host, s3_access_key, s3_access_secret, s3_bucket_name)
"""
thumb_base_url = f'https://{s3_host}/{s3_bucket_name}'
self.save_all(output_dir, catalog_type=catalog_type, thumb_base_url=thumb_base_url)
output_dir = Path(output_dir)
files = find_files([output_dir, '*.*'])
client = Minio(s3_host, s3_access_key, s3_access_secret, secure=True)
args = [ (fpath, output_dir, client, s3_bucket_name, s3_prefix) for fpath in files ]
self._verbose(f"Copying {len(files)} files to {s3_host}/{s3_bucket_name}/{s3_prefix}")
for r in parallel.job(self._s3_fput_object, args, n_jobs=10, joblib_args={'backend': 'threading'}):
continue
self._verbose(f"End")
except Exception as e:
_warn_deps(e, 'eo.STACGenerator')
try:
from pystac import Catalog
from whoosh.qparser import QueryParser
from whoosh.index import create_in
from whoosh import fields
[docs] class STACIndex():
"""
Enable full-text search for static SpatioTemporal Asset Catalogs (STAC) using whoosh.
:param catalog: STAC catalog instance ``pystac.catalog.Catalog``.
:param index_dir: Output folder to store the whoosh index files.
:param verbose: Use ``True`` to print the progress of all steps.
"""
def __init__(self,
catalog:Catalog,
index_dir = 'stac_index',
verbose = False
):
self.catalog = catalog
self.index_dir = index_dir
self.verbose = verbose
self.ix = self._index()
def _index(self):
Path(self.index_dir).mkdir(parents=True, exist_ok=True)
ix = create_in(self.index_dir, fields.Schema(title=fields.TEXT(stored=True),
id=fields.ID(stored=True), description=fields.TEXT))
self._verbose(f'Retriving all collections from {self.catalog.title}')
collections = list(self.catalog.get_collections())
self._verbose(f'Creating index for {len(collections)} collections')
writer = ix.writer()
for collection in collections:
writer.add_document(
title=collection.title,
id=collection.id,
description=collection.title + ' ' + collection.description
)
writer.commit()
return ix
[docs] def search(self,
query,
field='title'
):
"""
Full-text Search over the title and / or description of the collections.
:param query: Search text.
:param field: Can be ``title`` or ``description``.
"""
result = []
with self.ix.searcher() as searcher:
parser = QueryParser(field, self.ix.schema)
myquery = parser.parse(query)
i = 0
for r in searcher.search(myquery):
result.append({
'pos': i,
'id': r['id'],
'title': r['title']
})
i += 1
return result
def _verbose(self, *args, **kwargs):
if self.verbose:
ttprint(*args, **kwargs)
except Exception as e:
_warn_deps(e, 'eo.STACGenerator')