eumap.parallel.utils.TaskSequencer

class TaskSequencer(tasks, mem_usage_limit=0.75, wait_timeout=5, verbose=False)[source]

Bases: object

Execute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data

Parameters
  • tasks (Union[List[Callable], List[tuple]]) – Task definition list, where each element can be: (1) a Callable function; (2) a tuple containing a Callable function and the number of workers for the task; or (3) a tuple containing a Callable function, the number of workers and an bool indication if the task would respect the mem_usage_limit. The default number of workers is 1.

  • mem_usage_limit (float) – Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if the task_1 is responsible for reading the data and task_2 for processing it, the task_1 definition can receive an bool indication to respect the mem_usage_limit, allowing the task_2 to process the data that has already been read and releasing memory for the next task_1 reads.

  • wait_timeout (int) – Timeout argument used by concurrent.futures.wait.

  • verbose (bool) – Use True to print the communication and status of the tasks

Examples

>>> from eumap.parallel import TaskSequencer
>>> 
>>> output = TaskSequencer(
>>> tasks=[ 
>>>   task_1, 
>>>   (task_2, 2)
>>> ]

Pipeline produced by this example code:

>>>                ----------      ----------
>>> input_data ->  | task_1 |  ->  | task_2 |  ->  output_data
>>>                 ----------      ----------
>>>                 |              |
>>>                 |-worker_1     |-worker_1
>>>                                |-worker_2

Methods

run

Run the task pipeline considering the input_data argument.

run(input_data)[source]

Run the task pipeline considering the input_data argument.

Parameters

input_data (List[tuple]) – Input data used to feed the first task.

Returns

List of returned values produced by the last task and with the same size of the input_data argument.

Return type

List

Examples

>>> from eumap.misc import ttprint
>>> from eumap.parallel import TaskSequencer
>>> import time
>>> 
>>> def rnd_data(const, size):
>>>     data = np.random.rand(size, size, size)
>>>     time.sleep(2)
>>>     return (const, data)
>>> 
>>> def max_value(const, data):
>>>     ttprint(f'Calculating the max value over {data.shape}')
>>>     time.sleep(8)
>>>     result = np.max(data + const)
>>>     return result
>>> 
>>> taskSeq = TaskSequencer(
>>> tasks=[ 
>>>      rnd_data, 
>>>      (max_value, 2)
>>>  ],
>>>  verbose=True
>>> )
>>> 
>>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ])
>>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ])