Skip to content

Commit

Permalink
Added suggestions, docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
ansys-akarcher committed Jul 28, 2023
1 parent fe690f3 commit 648dbc3
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 8 deletions.
93 changes: 87 additions & 6 deletions src/ansys/dpf/core/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,78 @@


class IncrementalHelper:
"""Provides an API to transform an existing workflow into an incrementally evaluating one.
It works by plugging operators into an incomplete workflow.
Example
-------
>>> from ansys.dpf import core as dpf
>>> from ansys.dpf.core import examples
>>> path = examples.find_msup_transient()
>>> ds = dpf.DataSources(path)
>>> scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(ds)
>>>
>>> result_op = dpf.operators.result.displacement(data_sources=ds, time_scoping=scoping)
>>> minmax_op = dpf.operators.min_max.min_max_fc_inc(result_op)
>>>
>>> new_op = dpf.split_workflow_in_chunks(result_op, minmax_op, scoping, chunk_size=5)
>>> min_field = new_op.get_output(0, dpf.types.field)
>>> max_field = new_op.get_output(1, dpf.types.field)
"""

def __init__(
self,
start_op: core.Operator,
end_op: core.Operator,
scoping: core.Scoping,
scoping_pin: int = None,
):
"""
"""Constructs an IncrementalHelper object.
Given the first and the last operator of a workflow, as well as the scoping.
This class can be used to simplify the use of incremental operators, and automatically
be enabled to incrementally evaluate a workflow.
Under the constraint that the end_op supports incremental evaluation.
Parameters
----------
start_op : Operator
First operator in the workflow to convert
end_op : Operator
Last operator in the workflow to convert (Operator providing the meaningful output)
scoping : Scoping
Scoping used to chunk the data
scoping_pin : int, optional
Pin number of the scoping on the first operator, otherwise it is deduced with types
"""
# Input operator should accept a scoping
# Last operator should support incremental evaluation
# but it should be permissive too (bad doc/spec whatever)
# but as we don't have a consistent method to check,
# it should be permissive in the case the specification isn't up to date
self._start_op = start_op
self._end_op = self._map_to_incremental(end_op)

Check warning on line 66 in src/ansys/dpf/core/incremental.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/core/incremental.py#L65-L66

Added lines #L65 - L66 were not covered by tests

self._scoping = scoping
self._scoping_pin = self._find_scoping_pin(scoping_pin)

Check warning on line 69 in src/ansys/dpf/core/incremental.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/core/incremental.py#L68-L69

Added lines #L68 - L69 were not covered by tests

def estimate_size(self, max_bytes: int, _dict_inputs: Dict[int, Any] = {}) -> int:
"""
"""Estimates the chunk size from the estimated number of bytes outputted in one iteration.
Estimation is based on the size of the output for one ID of the given time_scoping,
so it will run the operator for only one iteration.
It only supports Field and FieldContainer.
For other types, you should specify chunk_size argument in the split() method.
Parameters
----------
max_bytes : int
Max allowed size of an output from the first operator, for one iteration (in bytes).
_dict_inputs: dict[int,any]
Dictionary associating pin number to inputs, for evaluating output of one iteration.
"""
# Evaluate for the first element to try to guess memory consumption
# It is best to use with a lot of elements
Expand Down Expand Up @@ -107,9 +148,19 @@ def _prerun(self, _dict_inputs: Dict[int, Any]):
def split(
self, chunk_size: int, end_input_pin: int = 0, rescope: bool = False
) -> core.Operator:
"""
"""Integrate given operators into a new workflow enabling incremental evaluation.
Given a chunk size (multiple of given scoping), it will provide a new operator to retrieve
outputs from, and enable incremental evaluation, notably reducing peak memory usage.
Parameters
----------
chunk_size : int
Number of iterations per run
end_input_pin : int, optional
Pin number of the output to use from the first operator (default = 0)
rescope : bool, optional
Rescope all the outputs based on the given scoping (default = False)
"""
# Enables incremental evaluation:
# Using for_each, chunk_in_for_each_range and incremental version of the last operator
Expand Down Expand Up @@ -159,6 +210,10 @@ def split(
return output

Check warning on line 210 in src/ansys/dpf/core/incremental.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/core/incremental.py#L209-L210

Added lines #L209 - L210 were not covered by tests

def _map_to_incremental(self, end_op: core.Operator):
# The goal of this function is to validate that a given operator is indeed incremental.
# If an operator is found to not support incremental evaluation, it must not be strict
# it should only output warnings
# because this function -by design- may be outdated.
inc_operators = [

Check warning on line 217 in src/ansys/dpf/core/incremental.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/core/incremental.py#L217

Added line #L217 was not covered by tests
"accumulate_level_over_label_fc",
"accumulate_min_over_label_fc",
Expand Down Expand Up @@ -221,11 +276,37 @@ def split_workflow_in_chunks(
scoping_pin: int = None,
end_input_pin: int = 0,
):
"""
This method helps transform a workflow into an incrementally evaluating one.
"""Transforms a workflow into an incrementally evaluating one.
It wraps in one method the functionality of the IncrementalHelper class as well
as the estimation of the chunk size.
If no chunk_size is specified, the function will attempt to estimate the value
by calling IncrementalHelper.estimate_size(max_bytes, dict_inputs).
If no scoping_pin is specified, the function will attempt to deduce the correct pin,
which would be the first input pin matching a scoping type.
Parameters
----------
start_op : Operator
Initial operator of the workflow to convert
end_op : Operator
Last operator of the workflow to convert
scoping : Scoping
Scoping to split across multiple evaluation
rescope : bool, optional
If enabled, will rescope final outputs with the given scoping (default = False)
max_bytes : int, optional
Max allowed size for the output from the first operator (default = 1024**3)
dict_inputs : dict[int, any], optional
Inputs to pass to the first operator, used only for the estimation run (default = {})
chunk_size = int, optional
Maximum number of scoping elements to process in an iteration (default = None)
scoping_pin : int, optional
The pin number on the first operator to bind the scoping (default = None)
end_input_pin : int, optional
Pin number of the output to use from the first operator(default = 0)
"""
splitter = IncrementalHelper(start_op, end_op, scoping, scoping_pin)

Check warning on line 311 in src/ansys/dpf/core/incremental.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/core/incremental.py#L311

Added line #L311 was not covered by tests

Expand Down
3 changes: 1 addition & 2 deletions tests/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def create_wf():
res_op = core.operators.result.displacement(
data_sources=ds, time_scoping=scoping, server=server_type
)
norm_fc = core.operators.math.norm_fc(res_op, server=server_type)
minmax_op = core.operators.min_max.min_max_fc_inc(norm_fc, server=server_type)
minmax_op = core.operators.min_max.min_max_fc_inc(res_op, server=server_type)

return (res_op, minmax_op)

Expand Down

0 comments on commit 648dbc3

Please sign in to comment.