eumap.parallel.utils.TaskSequencer¶
- class TaskSequencer(tasks, mem_usage_limit=0.75, wait_timeout=5, verbose=False)[source]¶
Bases:
object
Execute a pipeline of sequential tasks, in a way that the output of one task is used as input for the next task. For each task, a pool of workers is created, allowing the execution of all the available workers in parallel, for different portions of the input data
- Parameters
tasks (
Union
[List
[Callable
],List
[tuple
]]) – Task definition list, where each element can be: (1) aCallable
function; (2) a tuple containing aCallable
function and the number of workers for the task; or (3) a tuple containing aCallable
function, the number of workers and anbool
indication if the task would respect themem_usage_limit
. The default number of workers is1
.mem_usage_limit (
float
) – Percentage of memory usage that when reached triggers a momentarily stop of execution for specific tasks. For example, if thetask_1
is responsible for reading the data andtask_2
for processing it, thetask_1
definition can receive anbool
indication to respect themem_usage_limit
, allowing thetask_2
to process the data that has already been read and releasing memory for the nexttask_1
reads.wait_timeout (
int
) – Timeout argument used byconcurrent.futures.wait
.verbose (
bool
) – UseTrue
to print the communication and status of the tasks
Examples
>>> from eumap.parallel import TaskSequencer >>> >>> output = TaskSequencer( >>> tasks=[ >>> task_1, >>> (task_2, 2) >>> ]
Pipeline produced by this example code:
>>> ---------- ---------- >>> input_data -> | task_1 | -> | task_2 | -> output_data >>> ---------- ---------- >>> | | >>> |-worker_1 |-worker_1 >>> |-worker_2
Methods
Run the task pipeline considering the
input_data
argument.- run(input_data)[source]¶
Run the task pipeline considering the
input_data
argument.- Parameters
input_data (
List
[tuple
]) – Input data used to feed the first task.- Returns
List of returned values produced by the last task and with the same size of the
input_data
argument.- Return type
List
Examples
>>> from eumap.misc import ttprint >>> from eumap.parallel import TaskSequencer >>> import time >>> >>> def rnd_data(const, size): >>> data = np.random.rand(size, size, size) >>> time.sleep(2) >>> return (const, data) >>> >>> def max_value(const, data): >>> ttprint(f'Calculating the max value over {data.shape}') >>> time.sleep(8) >>> result = np.max(data + const) >>> return result >>> >>> taskSeq = TaskSequencer( >>> tasks=[ >>> rnd_data, >>> (max_value, 2) >>> ], >>> verbose=True >>> ) >>> >>> taskSeq.run(input_data=[ (const, 10) for const in range(0,3) ]) >>> taskSeq.run(input_data=[ (const, 20) for const in range(3,6) ])