eumap.parallel.utils.TaskSequencer¶
- class TaskSequencer(tasks, mem_usage_limit=0.75, wait_timeout=5, verbose=False)[source]¶
Bases:
objectExecute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data
- Parameters
tasks (
Union[List[Callable],List[tuple]]) – Task definition list, where each element can be: (1) aCallablefunction; (2) a tuple containing aCallablefunction and the number of workers for the task; or (3) a tuple containing aCallablefunction, the number of workers and anboolindication if the task would respect themem_usage_limit. The default number of workers is1.mem_usage_limit (
float) – Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if thetask_1is responsible for reading the data andtask_2for processing it, thetask_1definition can receive anboolindication to respect themem_usage_limit, allowing thetask_2to process the data that has already been read and releasing memory for the nexttask_1reads.wait_timeout (
int) – Timeout argument used byconcurrent.futures.wait.verbose (
bool) – UseTrueto print the communication and status of the tasks
Examples
>>> from eumap.parallel import TaskSequencer >>> >>> output = TaskSequencer( >>> tasks=[ >>> task_1, >>> (task_2, 2) >>> ]
Pipeline produced by this example code:
>>> ---------- ---------- >>> input_data -> | task_1 | -> | task_2 | -> output_data >>> ---------- ---------- >>> | | >>> |-worker_1 |-worker_1 >>> |-worker_2
Methods
Run the task pipeline considering the
input_dataargument.- run(input_data)[source]¶
Run the task pipeline considering the
input_dataargument.- Parameters
input_data (
List[tuple]) – Input data used to feed the first task.- Returns
List of returned values produced by the last task and with the same size of the
input_dataargument.- Return type
List
Examples
>>> from eumap.misc import ttprint >>> from eumap.parallel import TaskSequencer >>> import time >>> >>> def rnd_data(const, size): >>> data = np.random.rand(size, size, size) >>> time.sleep(2) >>> return (const, data) >>> >>> def max_value(const, data): >>> ttprint(f'Calculating the max value over {data.shape}') >>> time.sleep(8) >>> result = np.max(data + const) >>> return result >>> >>> taskSeq = TaskSequencer( >>> tasks=[ >>> rnd_data, >>> (max_value, 2) >>> ], >>> verbose=True >>> ) >>> >>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ]) >>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ])