diff --git a/src/ansys/fluent/core/__init__.py b/src/ansys/fluent/core/__init__.py index cbac5d85f6a..e4e13398cce 100644 --- a/src/ansys/fluent/core/__init__.py +++ b/src/ansys/fluent/core/__init__.py @@ -102,9 +102,12 @@ def version_info() -> str: # Whether to use datamodel attribute caching DATAMODEL_USE_ATTR_CACHE = True -# Whether stream and cache commands state +# Whether to stream and cache commands state DATAMODEL_USE_NOCOMMANDS_DIFF_STATE = True +# Whether to return the state changes on mutating datamodel rpcs +DATAMODEL_RETURN_STATE_CHANGES = True + # Whether to use remote gRPC file transfer service USE_FILE_TRANSFER_SERVICE = False diff --git a/src/ansys/fluent/core/data_model_cache.py b/src/ansys/fluent/core/data_model_cache.py index 06bae80a699..cec7fe70e60 100644 --- a/src/ansys/fluent/core/data_model_cache.py +++ b/src/ansys/fluent/core/data_model_cache.py @@ -8,6 +8,7 @@ from typing import Any, Dict, List from ansys.api.fluent.v0.variant_pb2 import Variant +from ansys.fluent.core.utils.fluent_version import FluentVersion StateType = ( bool @@ -101,6 +102,31 @@ def update(self, d: dict[str, Any], d1: dict[str, Any]): d[k] = v1 +def _is_dict_parameter_type(version: FluentVersion, rules: str, rules_path: str): + """Check if a parameter is a dict type.""" + from ansys.fluent.core import CODEGEN_OUTDIR + from ansys.fluent.core.services.datamodel_se import ( + PyDictionary, + PyNamedObjectContainer, + PyParameter, + ) + from ansys.fluent.core.utils import load_module + + module = load_module( + rules, CODEGEN_OUTDIR / f"datamodel_{version.number}" / f"{rules}.py" + ) + cls = module.Root + comps = rules_path.split("/") + for i, comp in enumerate(comps): + if hasattr(cls, comp): + cls = getattr(cls, comp) + if issubclass(cls, PyParameter) and i < len(comps) - 1: + return False + if issubclass(cls, PyNamedObjectContainer): + cls = getattr(cls, f"_{comp}") + return issubclass(cls, PyDictionary) + + class DataModelCache: """Class to manage datamodel cache.""" @@ -177,6 +203,8 @@ def _update_cache_from_variant_state( key: str, state: Variant, updaterFn, + rules_str: str, + version, ): if state.HasField("bool_state"): updaterFn(source, key, state.bool_state) @@ -198,7 +226,13 @@ def _update_cache_from_variant_state( updaterFn(source, key, []) for item in state.variant_vector_state.item: self._update_cache_from_variant_state( - rules, source, key, item, lambda d, k, v: d[k].append(v) + rules, + source, + key, + item, + lambda d, k, v: d[k].append(v), + rules_str + "/" + key.split(":", maxsplit=1)[0], + version, ) elif state.HasField("variant_map_state"): internal_names_as_keys = ( @@ -226,15 +260,28 @@ def _update_cache_from_variant_state( else: if key not in source: source[key] = {} - source = source[key] - for k, v in state.variant_map_state.item.items(): - self._update_cache_from_variant_state( - rules, source, k, v, dict.__setitem__ - ) + if version and _is_dict_parameter_type(version, rules, rules_str): + source[key] = {} + if state.variant_map_state.item: + source = source[key] + for k, v in state.variant_map_state.item.items(): + self._update_cache_from_variant_state( + rules, + source, + k, + v, + dict.__setitem__, + rules_str + "/" + k.split(":", maxsplit=1)[0], + version, + ) + else: + source[key] = {} else: updaterFn(source, key, None) - def update_cache(self, rules: str, state: Variant, deleted_paths: List[str]): + def update_cache( + self, rules: str, state: Variant, deleted_paths: List[str], version=None + ): """Update datamodel cache from streamed state. Parameters @@ -245,6 +292,8 @@ def update_cache(self, rules: str, state: Variant, deleted_paths: List[str]): streamed state deleted_paths : List[str] list of deleted paths + version : FluentVersion, optional + Fluent version """ cache = self.rules_str_to_cache[rules] with self._with_lock(rules): @@ -280,7 +329,13 @@ def update_cache(self, rules: str, state: Variant, deleted_paths: List[str]): break for k, v in state.variant_map_state.item.items(): self._update_cache_from_variant_state( - rules, cache, k, v, dict.__setitem__ + rules, + cache, + k, + v, + dict.__setitem__, + k.split(":", maxsplit=1)[0], + version, ) @staticmethod diff --git a/src/ansys/fluent/core/services/datamodel_se.py b/src/ansys/fluent/core/services/datamodel_se.py index 1998c6f656c..eb4f9fb88da 100644 --- a/src/ansys/fluent/core/services/datamodel_se.py +++ b/src/ansys/fluent/core/services/datamodel_se.py @@ -25,6 +25,7 @@ ) from ansys.fluent.core.services.streaming import StreamingService from ansys.fluent.core.solver.error_message import allowed_name_error_message +from ansys.fluent.core.utils.fluent_version import FluentVersion Path = list[tuple[str, str]] _TValue = None | bool | int | float | str | Sequence["_TValue"] | dict[str, "_TValue"] @@ -452,6 +453,7 @@ def __init__( self, channel: grpc.Channel, metadata: list[tuple[str, str]], + version: FluentVersion, fluent_error_state, file_transfer_service: Any | None = None, ) -> None: @@ -465,6 +467,7 @@ def __init__( self.subscriptions = SubscriptionList() self.file_transfer_service = file_transfer_service self.cache = DataModelCache() if pyfluent.DATAMODEL_USE_STATE_CACHE else None + self.version = version def get_attribute_value(self, rules: str, path: str, attribute: str) -> _TValue: """Get attribute value.""" @@ -495,7 +498,14 @@ def rename(self, rules: str, path: str, new_name: str) -> None: request.path = path request.new_name = new_name request.wait = True - self._impl.rename(request) + response = self._impl.rename(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def delete_child_objects( self, rules: str, path: str, obj_type: str, child_names: list[str] @@ -507,7 +517,14 @@ def delete_child_objects( for name in child_names: request.child_names.names.append(name) request.wait = True - self._impl.delete_child_objects(request) + response = self._impl.delete_child_objects(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def delete_all_child_objects(self, rules: str, path: str, obj_type: str) -> None: """Delete all child objects.""" @@ -516,7 +533,14 @@ def delete_all_child_objects(self, rules: str, path: str, obj_type: str) -> None request.path = path + "/" + obj_type request.delete_all = True request.wait = True - self._impl.delete_child_objects(request) + response = self._impl.delete_child_objects(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def set_state(self, rules: str, path: str, state: _TValue) -> None: """Set state.""" @@ -524,14 +548,28 @@ def set_state(self, rules: str, path: str, state: _TValue) -> None: rules=rules, path=path, wait=True ) _convert_value_to_variant(state, request.state) - self._impl.set_state(request) + response = self._impl.set_state(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def fix_state(self, rules, path) -> None: """Fix state.""" request = DataModelProtoModule.FixStateRequest() request.rules = rules request.path = convert_path_to_se_path(path) - self._impl.fix_state(request) + response = self._impl.fix_state(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def update_dict( self, rules: str, path: str, dict_state: dict[str, _TValue] @@ -541,14 +579,28 @@ def update_dict( rules=rules, path=path, wait=True ) _convert_value_to_variant(dict_state, request.dicttomerge) - self._impl.update_dict(request) + response = self._impl.update_dict(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def delete_object(self, rules: str, path: str) -> None: """Delete an object.""" request = DataModelProtoModule.DeleteObjectRequest( rules=rules, path=path, wait=True ) - self._impl.delete_object(request) + response = self._impl.delete_object(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) def execute_command( self, rules: str, path: str, command: str, args: dict[str, _TValue] @@ -559,6 +611,13 @@ def execute_command( ) _convert_value_to_variant(args, request.args) response = self._impl.execute_command(request) + if self.cache is not None: + self.cache.update_cache( + rules, + response.state, + response.deletedpaths, + version=self.version, + ) return _convert_variant_to_value(response.result) def execute_query( diff --git a/src/ansys/fluent/core/session.py b/src/ansys/fluent/core/session.py index 26593f7bbd5..f06cabdd433 100644 --- a/src/ansys/fluent/core/session.py +++ b/src/ansys/fluent/core/session.py @@ -147,6 +147,7 @@ def _build_from_fluent_connection( self._datamodel_service_se = service_creator("datamodel").create( fluent_connection._channel, fluent_connection._metadata, + self.get_fluent_version(), self._error_state, self._file_transfer_service, ) diff --git a/src/ansys/fluent/core/session_pure_meshing.py b/src/ansys/fluent/core/session_pure_meshing.py index 7c44df0b257..f3932df2a55 100644 --- a/src/ansys/fluent/core/session_pure_meshing.py +++ b/src/ansys/fluent/core/session_pure_meshing.py @@ -89,7 +89,9 @@ def __init__( stream = DatamodelStream(datamodel_service_se) stream.register_callback( functools.partial( - datamodel_service_se.cache.update_cache, rules=rules + datamodel_service_se.cache.update_cache, + rules=rules, + version=datamodel_service_se.version, ) ) self.datamodel_streams[rules] = stream diff --git a/src/ansys/fluent/core/streaming_services/datamodel_streaming.py b/src/ansys/fluent/core/streaming_services/datamodel_streaming.py index 4dd74401a14..c8206a3815a 100644 --- a/src/ansys/fluent/core/streaming_services/datamodel_streaming.py +++ b/src/ansys/fluent/core/streaming_services/datamodel_streaming.py @@ -1,8 +1,15 @@ """Provides a module for datamodel streaming.""" +import logging + +from google.protobuf.json_format import MessageToDict + from ansys.api.fluent.v0 import datamodel_se_pb2 +import ansys.fluent.core as pyfluent from ansys.fluent.core.streaming_services.streaming import StreamingService +network_logger: logging.Logger = logging.getLogger("pyfluent.networking") + class DatamodelStream(StreamingService): """Encapsulates a datamodel streaming service.""" @@ -28,6 +35,7 @@ def _process_streaming( """Processes datamodel events.""" data_model_request = datamodel_se_pb2.DataModelRequest(*args, **kwargs) data_model_request.rules = rules + data_model_request.returnstatechanges = pyfluent.DATAMODEL_RETURN_STATE_CHANGES if no_commands_diff_state: data_model_request.diffstate = datamodel_se_pb2.DIFFSTATE_NOCOMMANDS responses = self._streaming_service.begin_streaming( @@ -39,6 +47,9 @@ def _process_streaming( while True: try: response: datamodel_se_pb2.DataModelResponse = next(responses) + network_logger.debug( + f"GRPC_TRACE: RPC = /grpcRemoting.DataModel/BeginStreaming, response = {MessageToDict(response)}" + ) with self._lock: self._streaming = True for _, cb_list in self._service_callbacks.items(): diff --git a/src/ansys/fluent/core/workflow.py b/src/ansys/fluent/core/workflow.py index 74ea98a4091..3879d59fe2d 100644 --- a/src/ansys/fluent/core/workflow.py +++ b/src/ansys/fluent/core/workflow.py @@ -11,6 +11,7 @@ from ansys.fluent.core.services.datamodel_se import ( PyCallableStateObject, PyCommand, + PyMenu, PyMenuGeneric, PySingletonCommandArgumentsSubItem, ) @@ -149,11 +150,13 @@ def _convert_task_list_to_display_names(workflow_root, task_list): return [workflow_state[f"TaskObject:{x}"]["_name_"] for x in task_list] else: _display_names = [] - _org_path = workflow_root.path for _task_name in task_list: - workflow_root.path = [("TaskObject", _task_name), ("_name_", "")] - _display_names.append(workflow_root()) - workflow_root.path = _org_path + name_obj = PyMenu( + service=workflow_root.service, + rules=workflow_root.rules, + path=[("TaskObject", _task_name), ("_name_", "")], + ) + _display_names.append(name_obj()) return _display_names @@ -530,9 +533,8 @@ def _insert_next_task(self, task_name: str): raise ValueError( f"'{task_name}' cannot be inserted next to '{self.python_name()}'." ) - return self._task.InsertNextTask( - CommandName=self._python_task_names_map[task_name] - ) + self._task.InsertNextTask(CommandName=self._python_task_names_map[task_name]) + _call_refresh_task_accessors(self._command_source) @property def insertable_tasks(self): @@ -570,7 +572,9 @@ def __repr__(self): def __call__(self, **kwds) -> Any: if kwds: self._task.Arguments.set_state(**kwds) - return self._task.Execute() + result = self._task.Execute() + _call_refresh_task_accessors(self._command_source) + return result def _tasks_with_matching_attributes(self, attr: str, other_attr: str) -> list: this_command = self._command() diff --git a/tests/test_data_model_cache.py b/tests/test_data_model_cache.py index 687fa89f638..36927e222cd 100644 --- a/tests/test_data_model_cache.py +++ b/tests/test_data_model_cache.py @@ -45,7 +45,7 @@ def test_data_model_cache(): ({"r1": {}}, "r1", {"A": [3.0, 6.0]}, [], {"r1": {"A": [3.0, 6.0]}}), ({"r1": {}}, "r1", {"A": ["ab", "cd"]}, [], {"r1": {"A": ["ab", "cd"]}}), ({"r1": {"A": {}}}, "r1", {"A": {"B": 5}}, [], {"r1": {"A": {"B": 5}}}), - ({"r1": {"A": 5}}, "r1", {"A": {}}, [], {"r1": {"A": 5}}), + ({"r1": {"A": 5}}, "r1", {"A": {}}, [], {"r1": {"A": {}}}), ({"r1": {"A": 5}}, "r1", {"A": None}, [], {"r1": {"A": None}}), ( {"r1": {"A": {}}}, diff --git a/tests/test_datamodel_service.py b/tests/test_datamodel_service.py index 1e80c536fea..ec347596eb2 100644 --- a/tests/test_datamodel_service.py +++ b/tests/test_datamodel_service.py @@ -141,12 +141,12 @@ def test_add_on_changed(new_meshing_session): assert isinstance(task_list(), list) assert len(task_list()) == 0 data = [] - subscription = task_list.add_on_changed(lambda obj: data.append(len(obj()))) + subscription = task_list.add_on_changed(lambda obj: data.append(True)) assert data == [] meshing.workflow.InitializeWorkflow(WorkflowType="Watertight Geometry") sleep(5) assert len(data) > 0 - assert data[-1] > 0 + assert len(task_list()) > 0 data.clear() subscription.unsubscribe() meshing.workflow.InitializeWorkflow(WorkflowType="Fault-tolerant Meshing") @@ -508,7 +508,7 @@ def test_command_creation_inside_singleton(new_meshing_session): @pytest.mark.codegen_required -def test_read_ony_set_state(new_meshing_session): +def test_read_only_set_state(new_meshing_session): meshing = new_meshing_session meshing.preferences.MeshingWorkflow.SaveCheckpointFiles = True assert meshing.preferences.MeshingWorkflow.CheckpointingOption.is_read_only() diff --git a/tests/test_new_meshing_workflow.py b/tests/test_new_meshing_workflow.py index 3f56b2a19cf..fa2d46c9818 100644 --- a/tests/test_new_meshing_workflow.py +++ b/tests/test_new_meshing_workflow.py @@ -763,7 +763,6 @@ def test_watertight_workflow_dynamic_interface( "", ] ) - watertight.add_boundary_layer.insertable_tasks.create_volume_mesh.insert() assert "create_volume_mesh" in watertight.task_names() create_volume_mesh = watertight.create_volume_mesh @@ -1362,7 +1361,7 @@ def test_duplicate_tasks_in_workflow(new_meshing_session): watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() - assert watertight.task_names() == [ + assert set(watertight.task_names()) == { "import_geometry", "create_surface_mesh", "describe_geometry", @@ -1377,7 +1376,7 @@ def test_duplicate_tasks_in_workflow(new_meshing_session): "import_boi_geometry", "import_boi_geometry_1", "import_boi_geometry_2", - ] + } assert watertight.import_boi_geometry_1.arguments() @@ -1732,3 +1731,23 @@ def test_scenario_with_common_python_names_from_fdl(new_meshing_session): two_dimensional = meshing.two_dimensional_meshing() # Check if all task names are unique. assert len(two_dimensional.task_names()) == len(set(two_dimensional.task_names())) + + +@pytest.mark.skip("Failing in GitHub") +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=25.1") +def test_return_state_changes(new_meshing_session): + meshing = new_meshing_session + wt = meshing.watertight() + + import_file_name = examples.download_file( + "mixing_elbow.pmdb", "pyfluent/mixing_elbow" + ) + + wt.import_geometry.file_name.set_state(import_file_name) + + # trigger creation of downstream task when this task is updated: + wt.describe_geometry.multizone.set_state("Yes") + wt.describe_geometry() + + assert wt.add_multizone_controls