From 60e8b24d016d1897f00c3308b40c8f9ceb7aedf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Walstr=C3=B6m?= Date: Mon, 12 Aug 2024 21:51:35 +0200 Subject: [PATCH] infamy: Refactor xpath handling between restconf and netconf implmentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous xpath may or may not been an xpath, we still called it xpath. Remove the obscure function get_xpath() and instead transform to a URI in restconf.py This fix issue #490 Signed-off-by: Mattias Walström --- test/case/ietf_hardware/usb.py | 6 +- .../ietf_interfaces/iface_phys_address.py | 2 +- test/case/ietf_interfaces/ipv4_address.py | 3 +- test/case/ietf_interfaces/vlan_ping.py | 3 +- test/case/ietf_system/add_delete_user.py | 3 +- test/infamy/container.py | 3 +- test/infamy/iface.py | 9 ++- test/infamy/netconf.py | 21 ++---- test/infamy/restconf.py | 65 +++++++++++-------- test/infamy/transport.py | 11 ---- 10 files changed, 58 insertions(+), 68 deletions(-) diff --git a/test/case/ietf_hardware/usb.py b/test/case/ietf_hardware/usb.py index 22d6f6e89..54cee7c54 100755 --- a/test/case/ietf_hardware/usb.py +++ b/test/case/ietf_hardware/usb.py @@ -83,8 +83,7 @@ with test.step("Remove all hardware configuration"): for port in available: - xpath=target.get_xpath("/ietf-hardware:hardware/component", "name", port) - target.delete_xpath(xpath) + target.delete_xpath(f"/ietf-hardware:hardware/component[name='{port}']") with test.step("Verify USB ports locked"): for port in available: @@ -115,8 +114,7 @@ with test.step("Remove USB configuration, and reboot"): for port in available: - xpath=target.get_xpath("/ietf-hardware:hardware/component", "name", port) - target.delete_xpath(xpath) + target.delete_xpath(f"/ietf-hardware:hardware/component[name='{port}']") target.copy("running", "startup") target.reboot() if wait_boot(target) == False: diff --git a/test/case/ietf_interfaces/iface_phys_address.py b/test/case/ietf_interfaces/iface_phys_address.py index 60a21d26b..254a8dfa3 100755 --- a/test/case/ietf_interfaces/iface_phys_address.py +++ b/test/case/ietf_interfaces/iface_phys_address.py @@ -30,7 +30,7 @@ assert mac == cmac with test.step(f"Remove custom MAC address"): - xpath=target.get_iface_xpath(tport, "phys-address") + xpath=iface.get_iface_xpath(tport, "phys-address") target.delete_xpath(xpath) until(lambda: iface.get_phys_address(target, tport) == pmac) diff --git a/test/case/ietf_interfaces/ipv4_address.py b/test/case/ietf_interfaces/ipv4_address.py index b3ef99332..b0b6f1dea 100755 --- a/test/case/ietf_interfaces/ipv4_address.py +++ b/test/case/ietf_interfaces/ipv4_address.py @@ -40,8 +40,7 @@ until(lambda: iface.address_exist(target, interface_name, new_ip_address, proto='static')) with test.step(f"Remove IPv4 addresses from {interface_name}"): - xpath=target.get_iface_xpath(interface_name, path="ietf-ip:ipv4") - target.delete_xpath(xpath) + target.delete_xpath(f"/ietf-interfaces:interfaces/interface[name='{interface_name}']/ietf-ip:ipv4") with test.step("Get updated IP addresses"): until(lambda: iface.address_exist(target, interface_name, new_ip_address) == False) diff --git a/test/case/ietf_interfaces/vlan_ping.py b/test/case/ietf_interfaces/vlan_ping.py index c0be03db3..33216be3b 100755 --- a/test/case/ietf_interfaces/vlan_ping.py +++ b/test/case/ietf_interfaces/vlan_ping.py @@ -64,8 +64,7 @@ def test_ping(hport, should_pass): test_ping(hport,True) with test.step("Remove VLAN interface, and test again (should not be able to ping)"): - xpath=target.get_xpath("/ietf-interfaces:interfaces/interface", "name", f"{tport}.10") - target.delete_xpath(xpath) + target.delete_xpath(f"/ietf-interfaces:interfaces/interface[name='{tport}.10']") _, hport = env.ltop.xlate("host", "data") test_ping(hport,False) diff --git a/test/case/ietf_system/add_delete_user.py b/test/case/ietf_system/add_delete_user.py index 4e0b9eb68..ab01d5e6f 100755 --- a/test/case/ietf_system/add_delete_user.py +++ b/test/case/ietf_system/add_delete_user.py @@ -55,8 +55,7 @@ def generate_restrictred_credential(): assert user_found, f"User {username} not found" with test.step(f"Delete user ({username} / {hashed_password})"): - xpath=target.get_xpath("/ietf-system:system/authentication/user", "name", username) - target.delete_xpath(xpath) + target.delete_xpath(f"/ietf-system:system/authentication/user[name='{username}']") with test.step(f"Verify erasure of user ({username} / {hashed_password})"): running = target.get_config_dict("/ietf-system:system") diff --git a/test/infamy/container.py b/test/infamy/container.py index 4447b0053..ba5b9bf78 100644 --- a/test/infamy/container.py +++ b/test/infamy/container.py @@ -32,5 +32,4 @@ def running(self, name): return False def action(self, name, act): - xpath=self.system.get_xpath("/infix-containers:containers/container", "name", name, act) - return self.system.call_action(xpath) + return self.system.call_action(f"/infix-containers:containers/container[name='{name}']/{act}") diff --git a/test/infamy/iface.py b/test/infamy/iface.py index 322f334f9..ca8f757f0 100644 --- a/test/infamy/iface.py +++ b/test/infamy/iface.py @@ -2,6 +2,13 @@ Fetch interface status from remote device. """ +def get_iface_xpath(iface, path=None): + """Compose complete XPath to a YANG node in /ietf-interfaces""" + xpath=f"/ietf-interfaces:interfaces/interface[name='{iface}']" + if not path is None: + xpath=f"{xpath}/{path}" + return xpath + def _iface_extract_param(json_content, param): """Returns (extracted) value for parameter 'param'""" interfaces = json_content.get('interfaces') @@ -17,7 +24,7 @@ def _iface_extract_param(json_content, param): def _iface_get_param(target, iface, param=None): """Fetch target dict for iface and extract param from JSON""" - content = target.get_data(target.get_iface_xpath(iface, param)) + content = target.get_data(get_iface_xpath(iface, param)) return _iface_extract_param(content, param) def interface_exist(target, iface): diff --git a/test/infamy/netconf.py b/test/infamy/netconf.py index 553fbe083..3b75e4695 100644 --- a/test/infamy/netconf.py +++ b/test/infamy/netconf.py @@ -15,6 +15,7 @@ import lxml import netconf_client.connect import netconf_client.ncclient +import infamy.iface as iface from infamy.transport import Transport from netconf_client.error import RpcError @@ -381,31 +382,19 @@ def get_schema(self, schema, outdir): with open(outdir+"/"+schema["filename"], "w") as f: f.write(data.schema) - def get_xpath(self, xpath, key, value, path=None): - """Compose complete XPath to a YANG node in /ietf-interfaces""" - xpath = f"{xpath}[{key}='{value}']" - if not path is None: - xpath=f"{xpath}/{path}" - return xpath - - def get_iface_xpath(self, iface, path=None): - """Compose complete XPath to a YANG node in /ietf-interfaces""" - xpath = f"/ietf-interfaces:interfaces/interface" - return self.get_xpath(xpath, "name", iface, path) - - def get_iface(self, iface): + def get_iface(self, name): """Fetch target dict for iface and extract param from JSON""" - content = self.get_data(self.get_iface_xpath(iface)) + content = self.get_data(iface.get_iface_xpath(name)) interface=content.get("interfaces", {}).get("interface", None) if interface is None: return None # Restconf (rousette) address by id and netconf (netopper2) address by name - return interface[iface] + return interface[name] def delete_xpath(self, xpath): - # Split out the model and the container from xpath + # Split out the model and the container from xpath' pattern = r"^/(?P[^:]+):(?P[^/]+)" match = re.search(pattern, xpath) module = match.group('module') diff --git a/test/infamy/restconf.py b/test/infamy/restconf.py index 0b27f72ad..273aa3ebd 100644 --- a/test/infamy/restconf.py +++ b/test/infamy/restconf.py @@ -4,6 +4,8 @@ import os import sys import libyang +import re +import urllib.parse from requests.auth import HTTPBasicAuth from urllib.parse import quote @@ -23,6 +25,24 @@ class Location: username: str = "admin" port: int = 443 +def xpath_to_uri(xpath, extra=None): + """Convert xpath to HTTP URI""" + # If the xpath has a + pattern = r'\[(.*?)=["\'](.*?)["\']\]' + matches = re.findall(pattern, xpath) + + if matches: + for key, value in matches: + # replace [key=value] with =value + uri_path = re.sub(rf'\[{key}=["\']{value}["\']\]', f'={value}', xpath) + else: + uri_path = xpath + + # Append extra if provided + if extra is not None: + uri_path = f"{uri_path}/{extra}" + + return uri_path # Workaround for bug in requests 2.32.x: https://github.com/psf/requests/issues/6735 def requests_workaround(method, url, json, headers, auth, verify=False): @@ -152,25 +172,25 @@ def _get_raw(self, url, parse=True): else: return response.content - def get_datastore(self, datastore="operational" , xpath="", parse=True): + def get_datastore(self, datastore="operational" , path="", parse=True): """Get a datastore""" - path=f"/ds/ietf-datastores:{datastore}" - if not xpath is None: - path=f"{path}/{xpath}" - url=f"{self.restconf_url}{path}" + dspath=f"/ds/ietf-datastores:{datastore}" + if not path is None: + dspath=f"{dspath}/{path}" + url=f"{self.restconf_url}{dspath}" return self._get_raw(url, parse) - def get_running(self, xpath=None): + def get_running(self, path=None): """Wrapper function to get running datastore""" - return self.get_datastore("running", xpath) + return self.get_datastore("running", path) - def get_operational(self, xpath=None, parse=True): + def get_operational(self, path=None, parse=True): """Wrapper function to get operational datastore""" - return self.get_datastore("operational", xpath, parse) + return self.get_datastore("operational", path, parse) - def get_factory(self, xpath=None): + def get_factory(self, path=None): """Wrapper function to get factory defaults""" - return self.get_datastore("factory-default", xpath) + return self.get_datastore("factory-default", path) def post_datastore(self, datastore, data): """Actually send a POST to RESTCONF server""" @@ -226,11 +246,10 @@ def get_dict(self, xpath=None, parse=True): """NETCONF compat function, just wraps get_data""" return self.get_data(xpath, parse) - def get_data(self, xpath=None, key=None, value=None, parse=True): + def get_data(self, xpath=None, parse=True): """Get operational data""" - if key: - xpath=f"{xpath}={value}" - data=self.get_operational(xpath, parse) + uri=xpath_to_uri(xpath) if xpath is not None else None + data=self.get_operational(uri, parse) if parse==False: return data @@ -258,7 +277,8 @@ def factory_default(self): return self.call_rpc("infix-factory-default:factory-default") def call_action(self, xpath): - url=f"{self.restconf_url}/data{xpath}" + path=xpath_to_uri(xpath) + url=f"{self.restconf_url}/data{path}" response=requests_workaround_post( url, json=None, @@ -268,15 +288,6 @@ def call_action(self, xpath): response.raise_for_status() # Raise an exception for HTTP errors return response.content - def get_xpath(self, xpath, key, value, path=None): - """Compose complete XPath to a YANG node""" - xpath=f"{xpath}={value}" - - if not path is None: - xpath=f"{xpath}/{path}" - - return xpath - def get_current_time_with_offset(self): """Parse the time in the raw reply, before it has been passed through libyang, there all offset is lost""" data=self.get_data("/ietf-system:system-state/clock", parse=False) @@ -285,7 +296,7 @@ def get_current_time_with_offset(self): def get_iface(self, iface): """Fetch target dict for iface and extract param from JSON""" - content=self.get_data(self.get_iface_xpath(iface)) + content=self.get_data(f"/ietf-interfaces:interfaces/interface={iface}") interface=content.get("interfaces", {}).get("interface", None) if interface is None: return None @@ -295,7 +306,7 @@ def get_iface(self, iface): def delete_xpath(self, xpath): """Delete XPath from running config""" - path=f"/ds/ietf-datastores:running/{xpath}" + path=f"/ds/ietf-datastores:running/{xpath_to_uri(xpath)}" url=f"{self.restconf_url}{path}" response=requests_workaround_delete(url, headers=self.headers, auth=self.auth, verify=False) response.raise_for_status() # Raise an exception for HTTP errors diff --git a/test/infamy/transport.py b/test/infamy/transport.py index 480f8ee67..924cebe91 100644 --- a/test/infamy/transport.py +++ b/test/infamy/transport.py @@ -17,9 +17,6 @@ def put_config_dict(self, modname, edit): def get_dict(self, xpath=None): pass @abstractmethod - def get_xpath(self, xpath, key, value, path=None): - pass - @abstractmethod def delete_xpath(self, xpath): pass @abstractmethod @@ -36,9 +33,6 @@ def get_current_time_with_offset(self): def call_action(self, xpath): pass @abstractmethod - def get_iface_xpath(self, iface, path=None): - pass - @abstractmethod def get_iface(self, iface): # Should be common, but is not due to bug in rousette pass @@ -54,8 +48,3 @@ def reachable(self): """Check if the device reachable on ll6""" neigh = ll6ping(self.location.interface, flags=["-w1", "-c1", "-L", "-n"]) return bool(neigh) - - def get_iface_xpath(self, iface, path=None): - """Compose complete XPath to a YANG node in /ietf-interfaces""" - xpath = f"/ietf-interfaces:interfaces/interface" - return self.get_xpath(xpath, "name", iface, path)