diff --git a/src/ansys/dpf/core/incremental.py b/src/ansys/dpf/core/incremental.py index f03dd4f84e..0694350205 100644 --- a/src/ansys/dpf/core/incremental.py +++ b/src/ansys/dpf/core/incremental.py @@ -11,6 +11,26 @@ class IncrementalHelper: + """Provides an API to transform an existing workflow into an incrementally evaluating one. + + It works by plugging operators into an incomplete workflow. + + Example + ------- + >>> from ansys.dpf import core as dpf + >>> from ansys.dpf.core import examples + >>> path = examples.find_msup_transient() + >>> ds = dpf.DataSources(path) + >>> scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(ds) + >>> + >>> result_op = dpf.operators.result.displacement(data_sources=ds, time_scoping=scoping) + >>> minmax_op = dpf.operators.min_max.min_max_fc_inc(result_op) + >>> + >>> new_op = dpf.split_workflow_in_chunks(result_op, minmax_op, scoping, chunk_size=5) + >>> min_field = new_op.get_output(0, dpf.types.field) + >>> max_field = new_op.get_output(1, dpf.types.field) + """ + def __init__( self, start_op: core.Operator, @@ -18,17 +38,30 @@ def __init__( scoping: core.Scoping, scoping_pin: int = None, ): - """ + """Constructs an IncrementalHelper object. + Given the first and the last operator of a workflow, as well as the scoping. This class can be used to simplify the use of incremental operators, and automatically be enabled to incrementally evaluate a workflow. Under the constraint that the end_op supports incremental evaluation. + + Parameters + ---------- + start_op : Operator + First operator in the workflow to convert + end_op : Operator + Last operator in the workflow to convert (Operator providing the meaningful output) + scoping : Scoping + Scoping used to chunk the data + scoping_pin : int, optional + Pin number of the scoping on the first operator, otherwise it is deduced with types """ # Input operator should accept a scoping # Last operator should support incremental evaluation - # but it should be permissive too (bad doc/spec whatever) + # but as we don't have a consistent method to check, + # it should be permissive in the case the specification isn't up to date self._start_op = start_op self._end_op = self._map_to_incremental(end_op) @@ -36,12 +69,20 @@ def __init__( self._scoping_pin = self._find_scoping_pin(scoping_pin) def estimate_size(self, max_bytes: int, _dict_inputs: Dict[int, Any] = {}) -> int: - """ + """Estimates the chunk size from the estimated number of bytes outputted in one iteration. + Estimation is based on the size of the output for one ID of the given time_scoping, so it will run the operator for only one iteration. It only supports Field and FieldContainer. For other types, you should specify chunk_size argument in the split() method. + + Parameters + ---------- + max_bytes : int + Max allowed size of an output from the first operator, for one iteration (in bytes). + _dict_inputs: dict[int,any] + Dictionary associating pin number to inputs, for evaluating output of one iteration. """ # Evaluate for the first element to try to guess memory consumption # It is best to use with a lot of elements @@ -107,9 +148,19 @@ def _prerun(self, _dict_inputs: Dict[int, Any]): def split( self, chunk_size: int, end_input_pin: int = 0, rescope: bool = False ) -> core.Operator: - """ + """Integrate given operators into a new workflow enabling incremental evaluation. + Given a chunk size (multiple of given scoping), it will provide a new operator to retrieve outputs from, and enable incremental evaluation, notably reducing peak memory usage. + + Parameters + ---------- + chunk_size : int + Number of iterations per run + end_input_pin : int, optional + Pin number of the output to use from the first operator (default = 0) + rescope : bool, optional + Rescope all the outputs based on the given scoping (default = False) """ # Enables incremental evaluation: # Using for_each, chunk_in_for_each_range and incremental version of the last operator @@ -159,6 +210,10 @@ def split( return output def _map_to_incremental(self, end_op: core.Operator): + # The goal of this function is to validate that a given operator is indeed incremental. + # If an operator is found to not support incremental evaluation, it must not be strict + # it should only output warnings + # because this function -by design- may be outdated. inc_operators = [ "accumulate_level_over_label_fc", "accumulate_min_over_label_fc", @@ -221,11 +276,37 @@ def split_workflow_in_chunks( scoping_pin: int = None, end_input_pin: int = 0, ): - """ - This method helps transform a workflow into an incrementally evaluating one. + """Transforms a workflow into an incrementally evaluating one. It wraps in one method the functionality of the IncrementalHelper class as well as the estimation of the chunk size. + + If no chunk_size is specified, the function will attempt to estimate the value + by calling IncrementalHelper.estimate_size(max_bytes, dict_inputs). + + If no scoping_pin is specified, the function will attempt to deduce the correct pin, + which would be the first input pin matching a scoping type. + + Parameters + ---------- + start_op : Operator + Initial operator of the workflow to convert + end_op : Operator + Last operator of the workflow to convert + scoping : Scoping + Scoping to split across multiple evaluation + rescope : bool, optional + If enabled, will rescope final outputs with the given scoping (default = False) + max_bytes : int, optional + Max allowed size for the output from the first operator (default = 1024**3) + dict_inputs : dict[int, any], optional + Inputs to pass to the first operator, used only for the estimation run (default = {}) + chunk_size = int, optional + Maximum number of scoping elements to process in an iteration (default = None) + scoping_pin : int, optional + The pin number on the first operator to bind the scoping (default = None) + end_input_pin : int, optional + Pin number of the output to use from the first operator(default = 0) """ splitter = IncrementalHelper(start_op, end_op, scoping, scoping_pin) diff --git a/tests/test_incremental.py b/tests/test_incremental.py index e0d0f67d10..6df9e76b8e 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -50,8 +50,7 @@ def create_wf(): res_op = core.operators.result.displacement( data_sources=ds, time_scoping=scoping, server=server_type ) - norm_fc = core.operators.math.norm_fc(res_op, server=server_type) - minmax_op = core.operators.min_max.min_max_fc_inc(norm_fc, server=server_type) + minmax_op = core.operators.min_max.min_max_fc_inc(res_op, server=server_type) return (res_op, minmax_op)