Skip to content

Commit

Permalink
Refactor event creation API
Browse files Browse the repository at this point in the history
- Add new BehaviorSettings class to Workspace
- Event names are created dynamically from settings
  • Loading branch information
cogu committed Oct 2, 2024
1 parent 682c59c commit 330367c
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 41 deletions.
162 changes: 147 additions & 15 deletions src/autosar/xml/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -2930,13 +2930,51 @@ def ref(self) -> PackageRef:
return None if ref_str is None else PackageRef(ref_str)


class BehaviorSettings:
"""
Enables users to customize settings in SwcInternalBehavior such as naming conventions.
"""

def __init__(self) -> None:

# Default prefix used for generating event names
self.background_event_prefix: str | None = None # BackgroundEvent prefix
self.data_receive_error_event_prefix: str | None = None # DataReceiveErrorEvent prefix
self.data_receive_event_prefix: str | None = None # DataReceivedEvent prefix
self.init_event_prefix: str | None = None # InitEvent prefix
self.operation_invoked_event_prefix: str | None = None # OperationInvokedEvent prefix
self.swc_mode_manager_error_event_prefix: str | None = None # SwcModeManagerErrorEvent prefix
self.swc_mode_switch_event_prefix: str | None = None # SwcModeSwitchEvent prefix
self.timing_event_prefix: str | None = None # TimingEvent prefix

def set_value(self, name: str, value: str):
"""
Updates a single value with error check
"""
if hasattr(self, name):
if not isinstance(value, str):
raise TypeError(f"value: Expected string type. Got {str(type(value))}")
setattr(self, name, value)
else:
raise KeyError(f"name: Invalid name '{name}'")

def update(self, value_map: dict[str, str]):
"""
Updates multiple values using keys in value_map, with error-check
"""
for name, value in value_map.items():
self.set_value(name, value)


class PackageCollection:
"""
Base class that maintains a collection of AUTOSAR packages
"""

def __init__(self, packages: list[Package] | None = None) -> None:
def __init__(self, packages: list[Package] | None = None,
behavior_settings: BehaviorSettings | None = None) -> None:
self.parent = None
self.behavior_settings = behavior_settings
self.packages: list[Package] = [] # .PACKAGES
self._package_dict = {} # internal package map
if packages is not None:
Expand Down Expand Up @@ -6304,6 +6342,16 @@ def _get_valid_parent(self) -> SwComponentType:
raise RuntimeError("Behavior object doesn't have a valid parent")
return self.parent

def _get_valid_behavior_settings(self) -> BehaviorSettings:
"""
Verifies that the root collection has a valid behavior_settings object and returns it
"""
swc = self._get_valid_parent()
workspace = swc.root_collection()
if not isinstance(workspace.behavior_settings, BehaviorSettings):
raise TypeError("Root collection doesn't seem to be a valid workspace")
return workspace.behavior_settings

def ref(self) -> SwcInternalBehaviorRef | None:
"""
Returns a reference to this element or
Expand Down Expand Up @@ -6377,8 +6425,8 @@ def _make_unique_event_name(self, event_name: str) -> str:
return event_name

def create_background_event(self,
event_name: str,
runnable_name: str,
event_name: str | None = None,
**kwargs
) -> BackgroundEvent:
"""
Expand All @@ -6387,15 +6435,24 @@ def create_background_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.background_event_prefix:
event_name = behavior_settings.background_event_prefix + runnable_name
else:
msg = "event_name: Unable to dynamically create event name,"\
" background_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = BackgroundEvent(unique_event_name, runnable.ref(), **kwargs)
self.append_event(event)
return event

def create_data_receive_error_event(self,
event_name: str,
runnable_name: str,
port_data_element: str,
event_name: str | None = None,
**kwargs
) -> DataReceiveErrorEvent:
"""
Expand All @@ -6408,7 +6465,6 @@ def create_data_receive_error_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
unique_event_name = self._make_unique_event_name(event_name)
name_parts = split_ref_strict(port_data_element)
if len(name_parts) == 1:
port_name, data_element_name = name_parts[0], None
Expand All @@ -6421,6 +6477,18 @@ def create_data_receive_error_event(self,
if target_data_element is None:
msg = f"port_data_element: '{port_data_element}' does not name an existing data element in port interface"
raise ValueError(msg)
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.data_receive_error_event_prefix:
event_name = behavior_settings.data_receive_error_event_prefix + "_".join([runnable_name,
context_port.name,
target_data_element.name])
else:
msg = "event_name: Unable to dynamically create event name,"\
" data_receive_error_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = DataReceiveErrorEvent.make(unique_event_name,
runnable.ref(),
context_port.ref(),
Expand All @@ -6430,9 +6498,9 @@ def create_data_receive_error_event(self,
return event

def create_data_received_event(self,
event_name: str,
runnable_name: str,
data_element_ref: str,
event_name: str | None = None,
**kwargs
) -> DataReceivedEvent:
"""
Expand All @@ -6445,7 +6513,6 @@ def create_data_received_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
unique_event_name = self._make_unique_event_name(event_name)
name_parts = split_ref_strict(data_element_ref)
if len(name_parts) == 1:
port_name, data_element_name = name_parts[0], None
Expand All @@ -6458,6 +6525,18 @@ def create_data_received_event(self,
if target_data_element is None:
msg = f"data_element_ref: '{data_element_ref}' does not name an existing data element in port interface"
raise ValueError(msg)
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.data_receive_event_prefix:
event_name = behavior_settings.data_receive_event_prefix + "_".join([runnable_name,
context_port.name,
target_data_element.name])
else:
msg = "event_name: Unable to dynamically create event name,"\
" data_receive_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = DataReceivedEvent.make(unique_event_name,
runnable.ref(),
context_port.ref(),
Expand All @@ -6467,9 +6546,9 @@ def create_data_received_event(self,
return event

def create_data_send_completed_event(self,
event_name: str,
runnable_name: str,
data_element_ref: str,
event_name: str | None = None,
**kwargs
) -> DataSendCompletedEvent:
"""
Expand All @@ -6483,9 +6562,9 @@ def create_data_send_completed_event(self,
raise NotImplementedError("References to data send points are not yet supported")

def create_data_write_completed_event(self,
event_name: str,
runnable_name: str,
data_element_ref: str,
event_name: str | None = None,
**kwargs
) -> DataWriteCompletedEvent:
"""
Expand All @@ -6499,8 +6578,8 @@ def create_data_write_completed_event(self,
raise NotImplementedError("References to data write access are not yet supported")

def create_init_event(self,
event_name: str,
runnable_name: str,
event_name: str | None = None,
**kwargs
) -> InitEvent:
"""
Expand All @@ -6509,15 +6588,24 @@ def create_init_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.init_event_prefix:
event_name = behavior_settings.init_event_prefix + runnable_name
else:
msg = "event_name: Unable to dynamically create event name,"\
" init_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = InitEvent(unique_event_name, runnable.ref(), **kwargs)
self.append_event(event)
return event

def create_operation_invoked_event(self,
event_name: str,
runnable_name: str,
operation_ref: str,
event_name: str | None = None,
**kwargs
) -> OperationInvokedEvent:
"""
Expand All @@ -6528,7 +6616,6 @@ def create_operation_invoked_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
unique_event_name = self._make_unique_event_name(event_name)
name_parts = split_ref_strict(operation_ref)
if len(name_parts) == 1:
port_name, operation_name = name_parts[0], None
Expand All @@ -6540,6 +6627,19 @@ def create_operation_invoked_event(self,
target_provided_operation = swc.get_operation_in_port(context_port, operation_name)
if target_provided_operation is None:
raise ValueError(f"operation_ref: '{operation_ref}' does not name a valid operation in port interface")
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.operation_invoked_event_prefix:
prefix = behavior_settings.operation_invoked_event_prefix
event_name = prefix + "_".join([runnable_name,
context_port.name,
target_provided_operation.name])
else:
msg = "event_name: Unable to dynamically create event name,"\
" operation_invoked_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = OperationInvokedEvent.make(unique_event_name,
runnable.ref(),
context_port.ref(),
Expand All @@ -6549,9 +6649,9 @@ def create_operation_invoked_event(self,
return event

def create_swc_mode_manager_error_event(self,
event_name: str,
runnable_name: str,
port_name: str,
event_name: str | None = None,
**kwargs
) -> SwcModeManagerErrorEvent:
"""
Expand All @@ -6561,14 +6661,26 @@ def create_swc_mode_manager_error_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
unique_event_name = self._make_unique_event_name(event_name)
context_port = swc.find_p_port(port_name)
if context_port is None:
raise ValueError(f"port_name: '{port_name}' does not name an existing P-PORT or PR-PORT")
context_mode_declaration_group = swc.get_mode_declaration_group_in_port(context_port)
if context_mode_declaration_group is None:
msg = f"port_name: '{port_name}' does not name a valid ModeDeclarationGroupPrototype in port interface"
raise ValueError(msg)
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.swc_mode_manager_error_event_prefix:
prefix = behavior_settings.swc_mode_manager_error_event_prefix
event_name = prefix + "_".join([runnable_name,
context_port.name,
context_mode_declaration_group.name])
else:
msg = "event_name: Unable to dynamically create event name,"\
" swc_mode_manager_error_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = SwcModeManagerErrorEvent.make(unique_event_name,
runnable.ref(),
context_port.ref(),
Expand All @@ -6578,10 +6690,10 @@ def create_swc_mode_manager_error_event(self,
return event

def create_swc_mode_mode_switch_event(self,
event_name: str,
runnable_name: str,
mode_ref: str | list[str] | tuple[str, str],
activation: ar_enum.ModeActivationKind | None = None,
event_name: str | None = None,
**kwargs
) -> SwcModeSwitchEvent:
"""
Expand All @@ -6596,6 +6708,16 @@ def create_swc_mode_mode_switch_event(self,
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.swc_mode_switch_event_prefix:
prefix = behavior_settings.swc_mode_switch_event_prefix
event_name = prefix + runnable_name
else:
msg = "event_name: Unable to dynamically create event name,"\
" swc_mode_switch_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
if isinstance(mode_ref, str):
context_port, context_mode_declaration_group, target_mode_declaration = (
Expand Down Expand Up @@ -6628,17 +6750,27 @@ def create_swc_mode_mode_switch_event(self,
return event

def create_timing_event(self,
event_name: str,
runnable_name: str,
period: int | float | None = None,
offset: int | float | None = None,
event_name: str | None = None,
**kwargs) -> TimingEvent:
"""
Adds a new TimingEvent to this object
"""
runnable = self.find_runnable(runnable_name)
if runnable is None:
raise KeyError(f"Found no runnable with name '{runnable_name}'")
if event_name is None:
behavior_settings = self._get_valid_behavior_settings()
if behavior_settings.timing_event_prefix:
prefix = behavior_settings.timing_event_prefix
event_name = prefix + runnable_name
else:
msg = "event_name: Unable to dynamically create event name,"\
" timing_event_prefix is not set in behavior settings"
raise RuntimeError(msg)
assert isinstance(event_name, str)
unique_event_name = self._make_unique_event_name(event_name)
event = TimingEvent(unique_event_name, runnable.ref(), period, offset, **kwargs)
self.append_event(event)
Expand Down
5 changes: 4 additions & 1 deletion src/autosar/xml/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ class Workspace(ar_element.PackageCollection):
"""

def __init__(self, config_file_path: str | None = None, document_root: str | None = None) -> None:
super().__init__(behavior_settings=ar_element.BehaviorSettings())
self.namespaces: dict[str, Namespace] = {}
self.documents: list[DocumentConfig] = []
self.document_mappings: list[PackageToDocumentMapping] = []
self.document_root = document_root
self.package_map: dict[str, ar_element.Package] = {} # Each key is user-defined
super().__init__()
if config_file_path is not None:
self.load_config(config_file_path)

Expand Down Expand Up @@ -219,6 +219,9 @@ def load_config(self, file_path: str) -> None:
if document_mapping is not None:
for mapping in document_mapping.values():
self._create_document_mapping_from_config(mapping)
behavior_settings = config.get("behavior", None)
if behavior_settings is not None:
self.behavior_settings.update(behavior_settings)

def _write_document_from_config(self,
writer: Writer,
Expand Down
Loading

0 comments on commit 330367c

Please sign in to comment.