Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental workflow evaluation #946

Merged
merged 25 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
da133bf
initial working version
ansys-akarcher May 11, 2023
134e11d
Created working example
ansys-akarcher May 11, 2023
bca0d78
Enhanced scoping_on_all_time_freq
ansys-akarcher May 12, 2023
9d6986a
Added chunk_size estimation and wrapping method
ansys-akarcher May 12, 2023
0ba67cf
Formatted
ansys-akarcher May 12, 2023
e093130
Added basic comments for example
ansys-akarcher May 12, 2023
9a4f48a
Renaming + minor changes
ansys-akarcher May 16, 2023
bed0e83
Added tests for incremental evaluation
ansys-akarcher May 17, 2023
f13b8ad
Merge branch 'master' into feat/incremental_eval
ansys-akarcher Jun 1, 2023
d28ec63
Merge branch 'master' into feat/incremental_eval
ansys-akarcher Jun 14, 2023
b146fcd
Missing docstring
ansys-akarcher Jun 14, 2023
7dae8b4
Add plot in example
ansys-akarcher Jun 14, 2023
3a5bd09
fixed style
ansys-akarcher Jun 14, 2023
cbda91e
Revert "Merge branch 'master' into feat/incremental_eval"
ansys-akarcher Jun 14, 2023
a4c4213
Merge branch 'master' into feat/incremental_eval
ansys-akarcher Jun 20, 2023
ce34e51
Skip incremental related tests if before 23R2
ansys-akarcher Jun 20, 2023
2b081c4
Apply suggestions from code review
ansys-akarcher Jun 21, 2023
d358d89
Merge branch 'master' into feat/incremental_eval
ansys-akarcher Jun 21, 2023
9f47932
Added test for chunking estimation
ansys-akarcher Jun 21, 2023
b16e1ab
Merge branch 'master' into feat/incremental_eval
ansys-akarcher Jul 5, 2023
fe690f3
Merge branch 'master' of https://github.com/ansys/pydpf-core into fea…
ansys-akarcher Jul 27, 2023
648dbc3
Added suggestions, docstrings
ansys-akarcher Jul 28, 2023
b7d0987
Merge branch 'master' of https://github.com/ansys/pydpf-core into fea…
ansys-akarcher Jul 28, 2023
803c1e0
Merge branch 'master' into feat/incremental_eval
PProfizi Aug 1, 2023
92b2985
Update src/ansys/dpf/core/incremental.py
PProfizi Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions examples/04-advanced/14-incremental_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
.. _ref_incremental_evaluation:

Use incremental evaluation helper
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This example shows you how to use the incremental evaluation helper.
"""

# Import necessary modules
from ansys.dpf import core as dpf
from ansys.dpf.core import examples


#######################################################################################
# Retrieve an example to instantiate a DataSources object
path = examples.download_transient_result()
ds = dpf.DataSources(path)

# From the DataSources object we can retrieve the scoping
# In this example we want to compute the min/max for all the time sets
tf_provider = dpf.operators.metadata.time_freq_provider(data_sources=ds)
tf_support = tf_provider.get_output(output_type=dpf.types.time_freq_support)
scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(tf_support)

# If you don't need to reuse TimeFreqSupport you could also use the DataSources
# scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(ds)

#######################################################################################
# Defining the workflow to exploit

# Instantiating a streams_provider is important when dealing with incremental evaluation
# due to multiple reuses of operators
streams_provider = dpf.operators.metadata.streams_provider(data_sources=ds)

# Defining the main workflow
result_op = dpf.operators.result.stress(
data_sources=ds, time_scoping=scoping, streams_container=streams_provider
)
norm_fc = dpf.operators.math.norm_fc(result_op)
final_op = dpf.operators.min_max.min_max_fc_inc(norm_fc)

#######################################################################################
# Obtain a new operator to retrieve outputs from

# Workflow is adapted from the first and the last operator in the current workflow
# Scoping is important to split the workload into chunks
new_end_op = dpf.split_workflow_in_chunks(result_op, final_op, scoping)


# Obtain results on the same pin numbers
min = new_end_op.get_output(0, dpf.types.field)
ansys-akarcher marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ansys-akarcher same remark as @cbellot000:
can't we write
min = new_end_op.outputs.min_field.eval()
or something similar?

max = new_end_op.get_output(1, dpf.types.field)

# Plot results
import matplotlib.pyplot as plt

x = tf_support.time_frequencies.data
plt.plot(x, min.data, "b", label="Min")
plt.plot(x, max.data, "r", label="Max")
plt.xlabel("Time")
plt.ylabel("Stress")
plt.legend()
plt.show()
4 changes: 3 additions & 1 deletion src/ansys/dpf/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@
LicenseContextManager
)
from ansys.dpf.core.unit_system import UnitSystem, unit_systems
from ansys.dpf.core.incremental import IncrementalHelper, split_workflow_in_chunks
from ansys.dpf.core.any import Any
from ansys.dpf.core.mesh_info import MeshInfo
from ansys.dpf.core.generic_data_container import GenericDataContainer
from ansys.dpf.core.any import Any


# for matplotlib
# solves "QApplication: invalid style override passed, ignoring it."
Expand Down
318 changes: 318 additions & 0 deletions src/ansys/dpf/core/incremental.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
"""
.. _ref_incremental:

Incremental
========
PProfizi marked this conversation as resolved.
Show resolved Hide resolved
"""

from ansys.dpf import core

from typing import Dict, Any


class IncrementalHelper:
"""Provides an API to transform an existing workflow into an incrementally evaluating one.

It works by plugging operators into an incomplete workflow.

Example
-------
>>> from ansys.dpf import core as dpf
>>> from ansys.dpf.core import examples
>>> path = examples.find_msup_transient()
>>> ds = dpf.DataSources(path)
>>> scoping = dpf.time_freq_scoping_factory.scoping_on_all_time_freqs(ds)
>>>
>>> result_op = dpf.operators.result.displacement(data_sources=ds, time_scoping=scoping)
>>> minmax_op = dpf.operators.min_max.min_max_fc_inc(result_op)
>>>
>>> new_op = dpf.split_workflow_in_chunks(result_op, minmax_op, scoping, chunk_size=5)
>>> min_field = new_op.get_output(0, dpf.types.field)
>>> max_field = new_op.get_output(1, dpf.types.field)
"""

def __init__(
self,
start_op: core.Operator,
end_op: core.Operator,
scoping: core.Scoping,
scoping_pin: int = None,
):
"""Constructs an IncrementalHelper object.

Given the first and the last operator of a workflow, as well as the scoping.

This class can be used to simplify the use of incremental operators, and automatically
be enabled to incrementally evaluate a workflow.

Under the constraint that the end_op supports incremental evaluation.

Parameters
----------
start_op : Operator
First operator in the workflow to convert
end_op : Operator
Last operator in the workflow to convert (Operator providing the meaningful output)
scoping : Scoping
Scoping used to chunk the data
scoping_pin : int, optional
Pin number of the scoping on the first operator, otherwise it is deduced with types
"""
# Input operator should accept a scoping
# Last operator should support incremental evaluation
# but as we don't have a consistent method to check,
# it should be permissive in the case the specification isn't up to date
self._start_op = start_op
self._end_op = self._map_to_incremental(end_op)

self._scoping = scoping
self._scoping_pin = self._find_scoping_pin(scoping_pin)

def estimate_size(self, max_bytes: int, _dict_inputs: Dict[int, Any] = {}) -> int:
"""Estimates the chunk size from the estimated number of bytes outputted in one iteration.

Estimation is based on the size of the output for one ID of the given time_scoping,
so it will run the operator for only one iteration.

It only supports Field and FieldContainer.
For other types, you should specify chunk_size argument in the split() method.

Parameters
----------
max_bytes : int
Max allowed size of an output from the first operator, for one iteration (in bytes).
_dict_inputs: dict[int,any]
Dictionary associating pin number to inputs, for evaluating output of one iteration.
"""
# Evaluate for the first element to try to guess memory consumption
# It is best to use with a lot of elements
first_id = self._scoping.ids[0]
srv = self._scoping._server
loc = self._scoping.location
_dict_inputs[self._scoping_pin] = core.Scoping(server=srv, ids=[first_id], location=loc)

outputs = self._prerun(_dict_inputs)

_outputs = outputs._outputs
data = map(lambda o: o.get_data(), _outputs)
# Output sizes of all inputs for one iteration
sizes = map(lambda obj: self._compute_size(obj), data)

# Total size for one ID in the scoping
size_for_one = sum(sizes)
# total_size = size_for_one * self._scoping.size

num_iter = int(max_bytes / size_for_one)
num_iter = min(max(num_iter, 1), self._scoping.size) # clamp(num_iter, 1, scoping size)
return num_iter # math.gcd(num_iter, self._scoping.size)

def _compute_size(self, obj):
if isinstance(obj, core.FieldsContainer):
fc = obj
return self._compute_size(fc[0])
elif isinstance(obj, core.Field):
field = obj
# Double = 8 bytes assumption
return field.size * 8

raise NotImplementedError()

def _prerun(self, _dict_inputs: Dict[int, Any]):
""""""

for pin_idx, val in _dict_inputs.items():
self._start_op.connect(pin_idx, val)
self._start_op.run()
return self._start_op.outputs

# Transforms a user workflow:
#
# +----------+ +---------------+ +---------+
# scoping ->| start_op | -> | middle ops... | -> | end_op | ->
# +----------+ +---------------+ +---------+
#
# Into a new workflow like this:
#
# +----------+ +---------------+ +---------+
# scoping ->| start_op | -> | middle ops... | -> | end_op |
# \ +----------+ +---------------+ +---------+
# \ \ |
# \ \ +------------------+ | (pins remaps)
# \ \> | | | +----------+ +-----------+
# \ scop_pin -> | chunk_in | +-> | | -> | forward | -> final
# +----------> | for_each_range | iterables | | | (new end) | outputs
# chunk_size -> | | -----------> | for_each | +-----------+
# +------------------+ | |
# end_input_pin--> | |
# +----------+
def split(
self, chunk_size: int, end_input_pin: int = 0, rescope: bool = False
) -> core.Operator:
"""Integrate given operators into a new workflow enabling incremental evaluation.

Given a chunk size (multiple of given scoping), it will provide a new operator to retrieve
outputs from, and enable incremental evaluation, notably reducing peak memory usage.

Parameters
----------
chunk_size : int
Number of iterations per run
end_input_pin : int, optional
Pin number of the output to use from the first operator (default = 0)
rescope : bool, optional
Rescope all the outputs based on the given scoping (default = False)
"""
# Enables incremental evaluation:
# Using for_each, chunk_in_for_each_range and incremental version of the last operator
# by returning two operators with remapped inputs and outputs to other operators

_server = self._start_op._server

for_each = core.Operator("for_each", server=_server)
split_in_range = core.Operator("chunk_in_for_each_range", server=_server)
forward = core.Operator("forward", server=_server)

split_in_range.connect_operator_as_input(1, self._start_op)
split_in_range.connect(2, self._scoping_pin)
split_in_range.connect(3, self._scoping)
split_in_range.connect(4, chunk_size)

for_each.connect(0, split_in_range, 0)
for_each.connect(2, end_input_pin)

# connect inputs
dict_outputs = core.Operator.operator_specification(
op_name=self._end_op.name, server=_server
).outputs
if not dict_outputs:
# temporary patch for incremental:: operators
dict_outputs = {0: None}

fe_pin_idx = 3 # see doc of for_each
for pin_idx in dict_outputs.keys():
# connect end_op to for_each
for_each.connect(fe_pin_idx, self._end_op, pin_idx)
# remap
forward.connect(pin_idx, for_each, fe_pin_idx)
fe_pin_idx += 1

output = forward

if rescope:
new_forward = core.Operator("forward")
for pin_idx in dict_outputs.keys():
rescope = core.Operator("Rescope")
rescope.connect(0, forward, pin_idx)
rescope.connect(1, self._scoping)
new_forward.connect(pin_idx, rescope, 0)

output = new_forward
return output

def _map_to_incremental(self, end_op: core.Operator):
# The goal of this function is to validate that a given operator is indeed incremental.
# If an operator is found to not support incremental evaluation, it must not be strict
# it should only output warnings
# because this function -by design- may be outdated.
inc_operators = [
"accumulate_level_over_label_fc",
"accumulate_min_over_label_fc",
"accumulate_over_label_fc",
"average_over_label_fc",
"min_max_inc",
"min_max_fc_inc",
"max_over_time_by_entity",
"min_max_over_time_by_entity",
"min_max_by_time",
"min_over_time_by_entity",
"time_of_max_by_entity",
"time_of_min_by_entity",
"incremental::merge::property_field",
"incremental::merge::mesh",
"incremental::merge::field",
"incremental::merge::fields_container",
]

map_to_inc = {"min_max": "min_max_inc", "min_max_fc": "min_max_fc_inc"}

if end_op.name not in inc_operators:
print(f"WARNING: Operator named {end_op.name} may not support incremental evaluation")
if end_op.name in map_to_inc.keys():
print(
f"An operator named {map_to_inc[end_op.name]} supports incremental evaluation"
)

if "incremental" in end_op.config.available_config_options:
end_op.config.set_config_option("incremental", True)

return end_op

def _find_scoping_pin(self, pin_idx):
dict_inputs = self._start_op.inputs._dict_inputs
# validate given pin_idx
if pin_idx != None and pin_idx in dict_inputs:
pin_spec = dict_inputs[pin_idx]
if "scoping" in pin_spec.type_names:
return pin_idx

# look for scoping pin
for pin_idx, spec in dict_inputs.items():
if "scoping" in spec.type_names:
return pin_idx

raise Exception(
f"Scoping pin could not be found in start_op with name '{self._start_op.name}'"
)


def split_workflow_in_chunks(
start_op: core.Operator,
ansys-akarcher marked this conversation as resolved.
Show resolved Hide resolved
end_op: core.Operator,
scoping: core.Scoping,
rescope: bool = False,
max_bytes: int = 1024**3,
dict_inputs: Dict[int, Any] = {},
chunk_size: int = None,
scoping_pin: int = None,
end_input_pin: int = 0,
):
"""Transforms a workflow into an incrementally evaluating one.

It wraps in one method the functionality of the IncrementalHelper class as well
as the estimation of the chunk size.

If no chunk_size is specified, the function will attempt to estimate the value
by calling IncrementalHelper.estimate_size(max_bytes, dict_inputs).

If no scoping_pin is specified, the function will attempt to deduce the correct pin,
which would be the first input pin matching a scoping type.

Parameters
----------
start_op : Operator
Initial operator of the workflow to convert
end_op : Operator
Last operator of the workflow to convert
scoping : Scoping
Scoping to split across multiple evaluation
rescope : bool, optional
If enabled, will rescope final outputs with the given scoping (default = False)
max_bytes : int, optional
Max allowed size for the output from the first operator (default = 1024**3)
dict_inputs : dict[int, any], optional
Inputs to pass to the first operator, used only for the estimation run (default = {})
chunk_size = int, optional
Maximum number of scoping elements to process in an iteration (default = None)
scoping_pin : int, optional
The pin number on the first operator to bind the scoping (default = None)
end_input_pin : int, optional
Pin number of the output to use from the first operator(default = 0)
"""
splitter = IncrementalHelper(start_op, end_op, scoping, scoping_pin)

if chunk_size == None:
print(f"Estimating chunk_size with max_bytes: {max_bytes}")
chunk_size = splitter.estimate_size(max_bytes, dict_inputs)
print(f"Done. chunk_size set to {chunk_size} (scoping size: {scoping.size})")

return splitter.split(chunk_size, end_input_pin, rescope)
Loading
Loading