Source code for eumap.datasets.eo.s2mosaic.job_executor

import requests
import sys
import socket
from pathlib import Path
from .s2tiler import S2Tiler

requests.packages.urllib3.disable_warnings()

fld = Path(__file__).resolve().parent

fn_cert = (fld/'cert.pem').as_posix()

[docs]class JobExecutorLocal(): @classmethod def get_new_job(cls, params): return cls(**params) def __init__(self, **params): self.index = params.pop('index') self.params = params #print(params) self.tiler = S2Tiler(**params) self.debug = params['debug'] def worker(self): self.result = self.tiler() == 'OK'
[docs] def __call__(self): print(f'STARTING: index={self.index}, {self.params["out_folder_prefix"]}, {self.params["tile_name"]}') sys.stdout.flush() self.worker() return self
def submit_job_report(self): if self.debug: print(f'FINISHED TEST: success={self.result}, report=OK') if not self.result: log = self.tiler.LOG fn = Path(self.params['bucket'])/Path(self.params['out_parent_folder'])/f'{self.params["tile_name"]}.log' fn.write_text('\n'.join(log)) sys.stdout.flush() else: print(f'FINISHED, success={self.result}, index={self.index}, {self.params["out_folder_prefix"]}, {self.params["tile_name"]}') pass
[docs]class JobExecutor(): ''' Class that fetches jobs, executes it and sends report ''' hostname = socket.gethostname() satmos_url = 'https://31.45.233.147:5000/api/get_job_v3' satmos_params = { 'apikey': '22e3a9eaabcadb21a3d3ef98e231a10beb7a7bccb8090f1682184d9b55817bfd9144b855738d2868c00fb5266db0230bc9753c61abac2015e0e9532c5ba8bb50', 'hostname': hostname} satmos_report_url = 'https://31.45.233.147:5000/api/report_job'
[docs] @classmethod def get_new_job(cls, add_params=None): ''' Fetches new job from server :param add_params: additional parameters for job :returns JobExecutor instance ''' res = requests.get(cls.satmos_url, cls.satmos_params, verify=False) #fn_cert) if res.status_code == 200: params = res.json() if add_params is not None: params.update(add_params) return cls(params) else: return None
def __init__(self, params): ''' Initialization of instance :param params: Parameters of job ''' self.status = params.pop('status') if self.status != 'OK': print (f'SOMETHING IS WRONG: {self.status}') else: self.job_id = params.pop('job_id') self.params = params self.tmp_folder = params.get('tmp_folder', None) self.data_folder = params.get('data_folder', None) self.tiler = S2Tiler(**self.params)
[docs] def worker(self): ''' Run worker and check for errors ''' self.result = self.tiler()=='OK' self.log = self.tiler.LOG return
[docs] def __call__(self): ''' Call worker for current job. ''' print(f'STARTING: job_id={self.job_id}, {self.tiler.out_folder_name[0]}') sys.stdout.flush() self.worker() return self
[docs] def submit_job_report(self): ''' Submit job report to server ''' result = self.result log = self.log if self.job_id==-1: print(f'FINISHED TEST: success={result}, report=OK, {self.tiler.out_folder_name[0]}') sys.stdout.flush() else: params = self.satmos_params.copy() data = {'job_id': self.job_id, 'result': int(result), 'log':'\n'.join(log)} res = requests.post(self.satmos_report_url, params=params, json=data, verify = False) #fn_cert) res = res.json() if res.get('status',None)=='OK': print(f'FINISHED: success={result}, report=OK, {self.tiler.out_folder_name[0]}' ) else: print(f'FINISHED: success={result}, report={res.json()}, {self.tiler.out_folder_name[0]}' ) sys.stdout.flush()