diff --git a/README.md b/README.md
index 88ff87d..75ceb41 100644
--- a/README.md
+++ b/README.md
@@ -5,6 +5,9 @@
[![Sonatype Jake](https://github.com/paterva/maltego-trx/actions/workflows/sonatype-jack.yml/badge.svg)](https://github.com/paterva/maltego-trx/actions/workflows/sonatype-jack.yml)
## Release Notes
+
+__1.6.0__: Automatically generate am `.mtz` for your local transforms
+
__1.5.2__: Add logging output for invalid / missing params in xml serialization
__1.5.1__: Add ignored files to starter and use README for pypi
@@ -305,6 +308,36 @@ registry.write_settings_config()
handle_run(__name__, sys.argv, application)
```
+### Generating an `.mtz` config with your local Transforms
+
+Since `maltego-trx>=1.6.0` you can generate an `.mtz` config file with your local transforms.
+
+If you're already using the `TransformRegistry`, just invoke the `write_local_config()` method.
+
+```python
+# project.py
+
+registry.write_local_mtz()
+```
+
+This will create a file called `local.mtz` in the current directory. You can then import this file into Maltego and
+start using your local transforms faster. Just remember that settings are not passed to local transforms.
+
+The method takes in the same arguments as the interface in the Maltego client.
+If you are using a `virtualenv` environment, you might want to change the `command` argument to use that.
+
+```bash
+# project.py
+
+registry.write_local_mtz(
+ mtz_path: str = "./local.mtz", # path to the local .mtz file
+ working_dir: str = ".",
+ command: str = "python3", # for a venv you might want to use `./venv/bin/python3`
+ params: str = "project.py",
+ debug: bool = True
+)
+```
+
## Legacy Transforms
[Documentation](https://docs.maltego.com/support/solutions/articles/15000018299-porting-old-trx-transforms-to-the-latest-version)
diff --git a/maltego_trx/__init__.py b/maltego_trx/__init__.py
index 185297d..4c41a25 100644
--- a/maltego_trx/__init__.py
+++ b/maltego_trx/__init__.py
@@ -1 +1 @@
-VERSION = "1.5.2"
\ No newline at end of file
+VERSION = "1.6.0"
diff --git a/maltego_trx/decorator_registry.py b/maltego_trx/decorator_registry.py
index f07fffe..45eda28 100644
--- a/maltego_trx/decorator_registry.py
+++ b/maltego_trx/decorator_registry.py
@@ -1,17 +1,41 @@
+import logging
import os
+import zipfile
+from collections import defaultdict
from dataclasses import dataclass, field
from itertools import chain
-from typing import List, Optional, Dict, Iterable
+from typing import List, Optional, Dict, Iterable, Tuple
-from maltego_trx.utils import filter_unique, pascal_case_to_title, escape_csv_fields, export_as_csv, serialize_bool, \
- name_to_path
+from maltego_trx.mtz import (
+ create_local_server_xml,
+ create_settings_xml,
+ create_transform_xml,
+ create_transform_set_xml,
+)
+from maltego_trx.utils import (
+ filter_unique,
+ pascal_case_to_title,
+ escape_csv_fields,
+ export_as_csv,
+ serialize_bool,
+ name_to_path,
+ serialize_xml,
+)
-TRANSFORMS_CSV_HEADER = "Owner,Author,Disclaimer,Description,Version," \
- "Name,UIName,URL,entityName," \
- "oAuthSettingId,transformSettingIDs,seedIDs"
+TRANSFORMS_CSV_HEADER = (
+ "Owner,Author,Disclaimer,Description,Version,"
+ "Name,UIName,URL,entityName,"
+ "oAuthSettingId,transformSettingIDs,seedIDs"
+)
SETTINGS_CSV_HEADER = "Name,Type,Display,DefaultValue,Optional,Popup"
+@dataclass(frozen=True)
+class TransformSet:
+ name: str
+ description: str
+
+
@dataclass()
class TransformMeta:
class_name: str
@@ -20,9 +44,10 @@ class TransformMeta:
description: str
output_entities: List[str]
disclaimer: str
+ transform_set: TransformSet
-@dataclass()
+@dataclass(frozen=True)
class TransformSetting:
name: str
display_name: str
@@ -49,87 +74,196 @@ class TransformRegistry:
host_url: str
seed_ids: List[str]
- version: str = '0.1'
+ version: str = "0.1"
display_name_suffix: str = ""
global_settings: List[TransformSetting] = field(default_factory=list)
oauth_settings_id: Optional[str] = ""
transform_metas: Dict[str, TransformMeta] = field(init=False, default_factory=dict)
- transform_settings: Dict[str, List[TransformSetting]] = field(init=False, default_factory=dict)
+ transform_settings: Dict[str, List[TransformSetting]] = field(
+ init=False, default_factory=dict
+ )
+ transform_sets: Dict[TransformSet, List[str]] = field(
+ init=False, default_factory=lambda: defaultdict(list)
+ )
- def register_transform(self, display_name: str, input_entity: str, description: str,
- settings: List[TransformSetting] = None, output_entities: List[str] = None,
- disclaimer: str = ""):
- """ This method can be used as a decorator on transform classes. The data will be used to fill out csv config
- files to be imported into a TDS.
+ def register_transform(
+ self,
+ display_name: str,
+ input_entity: str,
+ description: str,
+ settings: List[TransformSetting] = None,
+ output_entities: List[str] = None,
+ disclaimer: str = "",
+ transform_set: TransformSet = None,
+ ):
+ """This method can be used as a decorator on transform classes. The data will be used to fill out csv config
+ files to be imported into a TDS.
"""
def decorated(transform_callable: object):
cleaned_transform_name = name_to_path(transform_callable.__name__)
display = display_name or pascal_case_to_title(transform_callable.__name__)
- meta = TransformMeta(cleaned_transform_name,
- display, input_entity,
- description,
- output_entities or [],
- disclaimer)
+ meta = TransformMeta(
+ cleaned_transform_name,
+ display,
+ input_entity,
+ description,
+ output_entities or [],
+ disclaimer,
+ transform_set=transform_set,
+ )
self.transform_metas[cleaned_transform_name] = meta
if settings:
self.transform_settings[cleaned_transform_name] = settings
+ if transform_set:
+ self.transform_sets[transform_set].append(cleaned_transform_name)
+
return transform_callable
return decorated
- def write_transforms_config(self, config_path: str = "./transforms.csv", csv_line_limit: int = 100):
- """Exports the collected transform metadata as a csv-file to config_path"""
+ def _create_transforms_config(self) -> Iterable[str]:
global_settings_full_names = [gs.id for gs in self.global_settings]
- csv_lines = []
for transform_name, transform_meta in self.transform_metas.items():
- meta_settings = [setting.id for setting in
- self.transform_settings.get(transform_name, [])]
+ meta_settings = [
+ setting.id
+ for setting in self.transform_settings.get(transform_name, [])
+ ]
transform_row = [
- self.owner,
- self.author,
- transform_meta.disclaimer,
- transform_meta.description,
- self.version,
- transform_name,
- transform_meta.display_name + self.display_name_suffix,
- os.path.join(self.host_url, "run", transform_name),
- transform_meta.input_entity,
- ";".join(self.oauth_settings_id),
- # combine global and transform scoped settings
- ";".join(chain(meta_settings, global_settings_full_names)),
- ";".join(self.seed_ids)
+ self.owner,
+ self.author,
+ transform_meta.disclaimer,
+ transform_meta.description,
+ self.version,
+ transform_name,
+ transform_meta.display_name + self.display_name_suffix,
+ os.path.join(self.host_url, "run", transform_name),
+ transform_meta.input_entity,
+ ";".join(self.oauth_settings_id),
+ # combine global and transform scoped settings
+ ";".join(chain(meta_settings, global_settings_full_names)),
+ ";".join(self.seed_ids),
]
escaped_fields = escape_csv_fields(*transform_row)
- csv_lines.append(",".join(escaped_fields))
+ yield ",".join(escaped_fields)
+
+ def write_transforms_config(
+ self, config_path: str = "./transforms.csv", csv_line_limit: int = 100
+ ):
+ """Exports the collected transform metadata as a csv-file to config_path"""
- export_as_csv(TRANSFORMS_CSV_HEADER, csv_lines, config_path, csv_line_limit)
+ csv_lines = self._create_transforms_config()
- def write_settings_config(self, config_path: str = "./settings.csv", csv_line_limit: int = 100):
- """Exports the collected settings metadata as a csv-file to config_path"""
- chained_settings = chain(self.global_settings, *list(self.transform_settings.values()))
- unique_settings: Iterable[TransformSetting] = filter_unique(lambda s: s.name, chained_settings)
+ export_as_csv(
+ TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
+ )
+
+ def _create_settings_config(self) -> Iterable[str]:
+ chained_settings = chain(
+ self.global_settings, *list(self.transform_settings.values())
+ )
+ unique_settings: Iterable[TransformSetting] = filter_unique(
+ lambda s: s.name, chained_settings
+ )
- csv_lines = []
for setting in unique_settings:
setting_row = [
- setting.id,
- setting.setting_type,
- setting.display_name,
- setting.default_value or "",
- serialize_bool(setting.optional, 'True', 'False'),
- serialize_bool(setting.popup, 'Yes', 'No')
+ setting.id,
+ setting.setting_type,
+ setting.display_name,
+ setting.default_value or "",
+ serialize_bool(setting.optional, "True", "False"),
+ serialize_bool(setting.popup, "Yes", "No"),
]
escaped_fields = escape_csv_fields(*setting_row)
- csv_lines.append(",".join(escaped_fields))
+ yield ",".join(escaped_fields)
+
+ def write_settings_config(
+ self, config_path: str = "./settings.csv", csv_line_limit: int = 100
+ ):
+ """Exports the collected settings metadata as a csv-file to config_path"""
+
+ csv_lines = self._create_settings_config()
+
+ export_as_csv(
+ SETTINGS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
+ )
+
+ def _create_local_mtz(
+ self,
+ working_dir: str = ".",
+ command: str = "python3",
+ params: str = "project.py",
+ debug: bool = True,
+ ) -> Iterable[Tuple[str, str]]:
+ working_dir = os.path.abspath(working_dir)
+ if self.global_settings:
+ logging.warning(
+ f"Settings are not supported with local transforms. "
+ f"Global settings are: {', '.join(map(lambda s: s.name, self.global_settings))}"
+ )
+
+ """Creates an .mtz for bulk importing local transforms"""
+ server_xml = create_local_server_xml(self.transform_metas.keys())
+
+ server_xml_str = serialize_xml(server_xml)
+ yield "Servers/Local.tas", server_xml_str
+
+ for name, meta in self.transform_metas.items():
+ settings_xml = create_settings_xml(
+ working_dir, command, f"{params} local {name}", debug
+ )
+ settings_xml_str = serialize_xml(settings_xml)
+
+ tx_settings = self.transform_settings.get(name)
+ if tx_settings:
+ logging.warning(
+ "Settings are not supported with local transforms. "
+ f"Transform '{meta.display_name}' has: {', '.join(map(lambda s: s.name, tx_settings))}"
+ )
+
+ xml = create_transform_xml(
+ name,
+ meta.display_name,
+ meta.description,
+ meta.input_entity,
+ self.author,
+ )
+
+ xml_str = serialize_xml(xml)
+
+ yield f"TransformRepositories/Local/{name}.transform", xml_str
+ yield f"TransformRepositories/Local/{name}.transformsettings", settings_xml_str
+
+ for transform_set, transforms in self.transform_sets.items():
+ set_xml = create_transform_set_xml(
+ transform_set.name, transform_set.description, transforms
+ )
+
+ set_xml_str = serialize_xml(set_xml)
+
+ yield f"TransformSets/{transform_set.name}.set", set_xml_str
+
+ def write_local_mtz(
+ self,
+ mtz_path: str = "./local.mtz",
+ working_dir: str = ".",
+ command: str = "python3",
+ params: str = "project.py",
+ debug: bool = True,
+ ):
- export_as_csv(SETTINGS_CSV_HEADER, csv_lines, config_path, csv_line_limit)
+ with zipfile.ZipFile(mtz_path, "w") as mtz:
+ for path, content in self._create_local_mtz(
+ working_dir, command, params, debug
+ ):
+ mtz.writestr(path, content)
diff --git a/maltego_trx/mtz.py b/maltego_trx/mtz.py
new file mode 100644
index 0000000..212173b
--- /dev/null
+++ b/maltego_trx/mtz.py
@@ -0,0 +1,179 @@
+import datetime
+from typing import Iterable, List
+from xml.etree.ElementTree import Element, SubElement
+
+
+def create_last_sync_timestamp(timestamp: datetime.datetime = None) -> str:
+ timestamp = timestamp or datetime.datetime.utcnow()
+ return timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")
+
+
+def create_local_server_xml(transform_names: Iterable[str]) -> Element:
+ server_xml = Element("MaltegoServer", attrib={"name": "Local",
+ "enabled": "true",
+ "description": "Local transforms hosted on this machine",
+ "url": "http://localhost",
+ })
+ last_sync_xml = SubElement(server_xml, "LastSync")
+ last_sync_xml.text = create_last_sync_timestamp()
+
+ SubElement(server_xml, "Protocol", attrib={"version": "0.0"})
+ SubElement(server_xml, "Authentication", attrib={"type": "none"})
+
+ transforms_xml = SubElement(server_xml, "Transforms")
+ for name in transform_names:
+ SubElement(transforms_xml, "Transform", attrib={"name": name})
+
+ SubElement(server_xml, "Seeds")
+
+ return server_xml
+
+
+def create_settings_xml(working_dir: str, command: str, params: str, debug: bool) -> Element:
+ settings_xml = Element("TransformSettings",
+ attrib={"enabled": "true",
+ "disclaimerAccepted": "false",
+ "showHelp": "true",
+ "runWithAll": "true",
+ "favorite": "false",
+ })
+ properties_xml = SubElement(settings_xml, "Properties")
+
+ command_xml = SubElement(properties_xml, "Property",
+ attrib={"name": "transform.local.command",
+ "type": "string",
+ "popup": "false",
+ })
+ command_xml.text = command
+
+ parameters_xml = SubElement(properties_xml, "Property",
+ attrib={
+ "name": "transform.local.parameters",
+ "type": "string",
+ "popup": "false",
+ })
+ parameters_xml.text = params
+
+ working_directory_xml = SubElement(properties_xml, "Property",
+ attrib={
+ "name": "transform.local.working-directory",
+ "type": "string",
+ "popup": "false",
+ })
+ working_directory_xml.text = working_dir
+
+ debug_xml = SubElement(properties_xml, "Property",
+ attrib={
+ "name": "transform.local.debug",
+ "type": "boolean",
+ "popup": "false",
+ })
+ debug_xml.text = "true" if debug else "false"
+
+ return settings_xml
+
+
+def create_transform_xml(name: str, display_name: str, description: str, input_entity: str, author: str) -> Element:
+ transform_xml = Element("MaltegoTransform",
+ attrib={"name": name,
+ "displayName": display_name,
+ "abstract": "false",
+ "template": "false",
+ "visibility": "public",
+ "description": description,
+ "author": author,
+ "requireDisplayInfo": "false",
+ })
+
+ adapter_xml = SubElement(transform_xml, "TransformAdapter")
+ adapter_xml.text = "com.paterva.maltego.transform.protocol.v2api.LocalTransformAdapterV2"
+
+ properties_xml = SubElement(transform_xml, "Properties")
+ fields_xml = SubElement(properties_xml, "Fields")
+
+ SubElement(fields_xml, "Property",
+ attrib={"name": "transform.local.command",
+ "type": "string",
+ "nullable": "false",
+ "hidden": "false",
+ "readonly": "false",
+ "description": "The command to execute for this transform",
+ "popup": "false",
+ "abstract": "false",
+ "visibility": "public",
+ "auth": "false",
+ "displayName": "Command line",
+ })
+ SubElement(fields_xml, "Property",
+ attrib={"name": "transform.local.parameters",
+ "type": "string",
+ "nullable": "true",
+ "hidden": "false",
+ "readonly": "false",
+ "description": "The parameters to pass to the transform command",
+ "popup": "false",
+ "abstract": "false",
+ "visibility": "public",
+ "auth": "false",
+ "displayName": "Command parameters",
+ })
+ SubElement(fields_xml, "Property",
+ attrib={"name": "transform.local.working-directory",
+ "type": "string",
+ "nullable": "true",
+ "hidden": "false",
+ "readonly": "false",
+ "description": "The working directory used when invoking the executable",
+ "popup": "false",
+ "abstract": "false",
+ "visibility": "public",
+ "auth": "false",
+ "displayName": "Working directory",
+
+ })
+ SubElement(fields_xml, "Property",
+ attrib={
+ "name": "transform.local.debug",
+ "type": "boolean",
+ "nullable": "true",
+ "hidden": "false",
+ "readonly": "false",
+ "description": "When this is set, the transform's text output will be "
+ "printed to the output window",
+ "popup": "false",
+ "abstract": "false",
+ "visibility": "public",
+ "auth": "false",
+ "displayName": "Show debug info"
+ })
+
+ input_constraints_xml = SubElement(transform_xml, "InputConstraints")
+ SubElement(input_constraints_xml, "Entity",
+ attrib={"type": input_entity,
+ "min": "1", "max": "1"})
+
+ SubElement(transform_xml, "OutputEntities")
+ SubElement(transform_xml, "defaultSets")
+
+ stealth_level = SubElement(transform_xml, "StealthLevel")
+ stealth_level.text = "0"
+
+ return transform_xml
+
+
+def create_transform_set_xml(name: str, description: str, transforms: List[str]) -> Element:
+ set_xml = Element("TransformSet",
+ attrib={
+ "name": name,
+ "description": description
+ })
+
+ transforms_xml = SubElement(set_xml, "Transforms")
+ for transform in transforms:
+ SubElement(transforms_xml,
+ "Transform",
+ attrib={
+ "name": transform
+ })
+
+ return set_xml
diff --git a/setup.py b/setup.py
index 829c38a..9d55a91 100644
--- a/setup.py
+++ b/setup.py
@@ -27,7 +27,7 @@
packages=[
'maltego_trx',
'maltego_trx/template_dir',
- 'maltego_trx/template_dir/transforms'
+ 'maltego_trx/template_dir/transforms',
],
package_data={
'maltego_trx/template_dir': [
diff --git a/tests/__snapshots__/test_decorator_registry.ambr b/tests/__snapshots__/test_decorator_registry.ambr
new file mode 100644
index 0000000..1c1873c
--- /dev/null
+++ b/tests/__snapshots__/test_decorator_registry.ambr
@@ -0,0 +1,38 @@
+# name: test_write_local_mtz
+ list([
+ tuple(
+ 'Servers/Local.tas',
+ '2022-08-10 07:52:45 UTC',
+ ),
+ tuple(
+ 'TransformRepositories/Local/testclass.transform',
+ 'com.paterva.maltego.transform.protocol.v2api.LocalTransformAdapterV20',
+ ),
+ tuple(
+ 'TransformRepositories/Local/testclass.transformsettings',
+ 'python3project.py local testclass/home/maltegotrue',
+ ),
+ tuple(
+ 'TransformSets/test.set',
+ '',
+ ),
+ ])
+# ---
+# name: test_write_local_mtz.1
+ list([
+ 'Servers/Local.tas',
+ 'TransformRepositories/Local/testclass.transform',
+ 'TransformRepositories/Local/testclass.transformsettings',
+ 'TransformSets/test.set',
+ ])
+# ---
+# name: test_write_local_mtz_emit_global_settings_warning
+ list([
+ 'Settings are not supported with local transforms. Global settings are: test',
+ ])
+# ---
+# name: test_write_local_mtz_emit_settings_warning
+ list([
+ "Settings are not supported with local transforms. Transform 'Test Class' has: test",
+ ])
+# ---
diff --git a/tests/__snapshots__/test_mtz.ambr b/tests/__snapshots__/test_mtz.ambr
new file mode 100644
index 0000000..59f2931
--- /dev/null
+++ b/tests/__snapshots__/test_mtz.ambr
@@ -0,0 +1,21 @@
+# name: test_create_local_server_xml
+ '2022-08-10 07:52:45 UTC'
+# ---
+# name: test_create_settings_xml[kwargs0]
+ 'python3project.py.true'
+# ---
+# name: test_create_settings_xml[kwargs1]
+ 'venv/bin/python3main.py~/project/maltegofalse'
+# ---
+# name: test_create_transform_set_xml[kwargs0]
+ ''
+# ---
+# name: test_create_transform_set_xml[kwargs1]
+ ''
+# ---
+# name: test_create_transform_xml[kwargs0]
+ 'com.paterva.maltego.transform.protocol.v2api.LocalTransformAdapterV20'
+# ---
+# name: test_create_transform_xml[kwargs1]
+ 'com.paterva.maltego.transform.protocol.v2api.LocalTransformAdapterV20'
+# ---
diff --git a/tests/requirements.txt b/tests/requirements.txt
index ce11114..22b502c 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -1,6 +1,9 @@
-pytest==7.0.1; python_version == '3.6'
+pytest==7.0.1; python_version <= '3.6'
pytest==7.1.2; python_version > '3.6'
-syrupy==2.3.0
+pytest-mock==3.6.1; python_version <= '3.6'
+pytest-mock==3.8.2; python_version > '3.6'
+
+syrupy==2.3.1
petname==2.6
\ No newline at end of file
diff --git a/tests/test_decorator_registry.py b/tests/test_decorator_registry.py
new file mode 100644
index 0000000..e562ec6
--- /dev/null
+++ b/tests/test_decorator_registry.py
@@ -0,0 +1,275 @@
+import logging
+import os
+import random
+import tempfile
+import zipfile
+from typing import NamedTuple, List
+
+import petname
+import pytest as pytest
+from pytest_mock import MockerFixture
+
+from maltego_trx.decorator_registry import (
+ TransformSetting,
+ TransformRegistry,
+ TRANSFORMS_CSV_HEADER,
+ SETTINGS_CSV_HEADER,
+ TransformSet,
+)
+from maltego_trx.server import app
+from maltego_trx.utils import name_to_path, serialize_bool
+from tests.test_xml import _serialize_xml
+
+
+@pytest.fixture
+def client():
+ with app.test_client() as client:
+ yield client
+
+
+@pytest.fixture
+def registry():
+ registry: TransformRegistry = TransformRegistry(
+ owner="Maltego Technologies GmbH",
+ author="Maltego Support",
+ host_url="localhost",
+ seed_ids=["demo"],
+ )
+ return registry
+
+
+def make_transform_setting(global_setting: bool = None):
+ name = petname.generate()
+ setting_type = random.choice(
+ ["string", "boolean", "date", "datetime", "daterange", "url", "double", "int"]
+ )
+
+ return TransformSetting(
+ name=name,
+ display_name=name.title(),
+ setting_type=random.choice(setting_type),
+ default_value=petname.generate(),
+ optional=random.choice([True, False]),
+ popup=random.choice([True, False]),
+ global_setting=global_setting or random.choice([True, False]),
+ )
+
+
+def make_transform(
+ registry: TransformRegistry, settings: List[TransformSetting] = None
+):
+ display_name = petname.generate(separator=" ")
+ input_entity = petname.generate(separator=".")
+ description = petname.generate(words=10, separator=" ").title() + "."
+ settings = settings or [make_transform_setting(), make_transform_setting()]
+ output_entities = petname.generate(3).split("-")
+ disclaimer = petname.generate(words=10, separator=" ").title() + "."
+
+ @registry.register_transform(
+ display_name, input_entity, description, settings, output_entities, disclaimer
+ )
+ class TestClass:
+ pass
+
+ return TestClass
+
+
+def test_register_transform_decorator(registry):
+ test_settings = [make_transform_setting(), make_transform_setting()]
+
+ display_name = petname.generate(separator=" ")
+ input_entity = petname.generate(separator=".")
+ description = petname.generate(words=10, separator=" ").title() + "."
+ output_entities = petname.generate(3).split("-")
+ disclaimer = petname.generate(words=10, separator=" ").title() + "."
+
+ @registry.register_transform(
+ display_name,
+ input_entity,
+ description,
+ test_settings,
+ output_entities,
+ disclaimer,
+ )
+ class TestClass:
+ pass
+
+ path_name = name_to_path(TestClass.__name__)
+
+ tx_meta = registry.transform_metas.get(path_name)
+
+ assert tx_meta
+ assert tx_meta.display_name == display_name
+ assert tx_meta.input_entity == input_entity
+ assert tx_meta.description == description
+ assert tx_meta.disclaimer == disclaimer
+
+ assert test_settings == registry.transform_settings[path_name]
+
+
+class TransformCsvLine(NamedTuple):
+ owner: str
+ author: str
+ disclaimer: str
+ description: str
+ version: str
+ name: str
+ display_name: str
+ host: str
+ input_entity: str
+ oauth_id: str
+ settings_ids: str
+ seed_ids: str
+
+
+class SettingCsvLine(NamedTuple):
+ name: str
+ setting_type: str
+ display_name: str
+ default: str
+ optional: str
+ popup: str
+
+
+def test_transform_to_csv(registry: TransformRegistry):
+ random_class = make_transform(registry)
+
+ path_name = name_to_path(random_class.__name__)
+
+ tx_meta = registry.transform_metas.get(path_name)
+ tx_settings = registry.transform_settings.get(path_name, [])
+
+ registry.write_transforms_config()
+
+ with open("./transforms.csv") as transforms_csv:
+ header = next(transforms_csv)
+ assert header.rstrip("\n") == TRANSFORMS_CSV_HEADER
+
+ line = next(transforms_csv).rstrip("\n")
+ data: TransformCsvLine = TransformCsvLine(*line.split(","))
+
+ assert data.owner == registry.owner
+ assert data.author == registry.author
+ assert data.disclaimer == tx_meta.disclaimer
+ assert data.description == tx_meta.description
+ assert data.version == registry.version
+ assert data.name == tx_meta.class_name
+ assert data.display_name == tx_meta.display_name
+ assert data.host == os.path.join(registry.host_url, "run", path_name)
+ assert data.input_entity == tx_meta.input_entity
+ assert data.oauth_id == registry.oauth_settings_id
+ assert data.settings_ids.split(";") == [s.id for s in tx_settings]
+ assert data.seed_ids.split(";") == registry.seed_ids
+
+
+def test_setting_to_csv(registry: TransformRegistry):
+ local_setting = make_transform_setting(global_setting=False)
+
+ global_setting = make_transform_setting(global_setting=True)
+
+ registry.global_settings.append(global_setting)
+
+ @registry.register_transform("", "", "", settings=[local_setting])
+ class TestClass:
+ pass
+
+ registry.write_settings_config()
+ with open("./settings.csv") as settings_csv:
+ header = next(settings_csv)
+ assert header.rstrip("\n") == SETTINGS_CSV_HEADER
+
+ for line, setting in zip(
+ settings_csv.readlines(), [global_setting, local_setting]
+ ):
+ line = line.rstrip("\n")
+ data: SettingCsvLine = SettingCsvLine(*line.split(","))
+
+ assert data.name == setting.id
+ assert data.setting_type == setting.setting_type
+ assert data.display_name == setting.display_name
+ assert data.default == setting.default_value
+ assert data.optional == serialize_bool(setting.optional, "True", "False")
+ assert data.popup == serialize_bool(setting.popup, "Yes", "No")
+
+
+def test_write_local_mtz_emit_global_settings_warning(
+ registry: TransformRegistry, caplog, snapshot
+):
+ registry.global_settings = [
+ TransformSetting(
+ name="test", display_name="test", setting_type="string", global_setting=True
+ )
+ ]
+
+ with caplog.at_level(logging.WARNING):
+ list(registry._create_local_mtz())
+
+ assert caplog.messages == snapshot
+
+
+def test_write_local_mtz_emit_settings_warning(
+ registry: TransformRegistry, caplog, snapshot
+):
+ local_setting = TransformSetting(
+ name="test",
+ display_name="test",
+ setting_type="string",
+ global_setting=True,
+ )
+
+ @registry.register_transform("", "", "", settings=[local_setting])
+ class TestClass:
+ pass
+
+ with caplog.at_level(logging.WARNING):
+ list(registry._create_local_mtz())
+
+ assert caplog.messages == snapshot
+
+
+def test_write_local_mtz(registry: TransformRegistry, mocker: MockerFixture, snapshot):
+ mocker.patch(
+ "maltego_trx.mtz.create_last_sync_timestamp",
+ return_value="2022-08-10 07:52:45 UTC",
+ )
+ mocker.patch("maltego_trx.decorator_registry.serialize_xml", _serialize_xml)
+
+ transform_set = TransformSet(name="test", description="Test Transform Set")
+
+ @registry.register_transform("", "", "", transform_set=transform_set)
+ class TestClass:
+ pass
+
+ mtz_files = list(registry._create_local_mtz(working_dir="/home/maltego"))
+
+ assert mtz_files == snapshot
+
+ files = [path for path, content in mtz_files]
+
+ assert files == snapshot
+
+
+def test_write_local_mtz_file(registry: TransformRegistry, mocker: MockerFixture, snapshot):
+ mocker.patch(
+ "maltego_trx.mtz.create_last_sync_timestamp",
+ return_value="2022-08-10 07:52:45 UTC",
+ )
+ mocker.patch("maltego_trx.decorator_registry.serialize_xml", _serialize_xml)
+
+ transform_set = TransformSet(name="test", description="Test Transform Set")
+
+ @registry.register_transform("", "", "", transform_set=transform_set)
+ class TestClass:
+ pass
+
+ with tempfile.TemporaryDirectory() as working_dir:
+ mtz_path = os.path.join(working_dir, "local.mtz")
+
+ registry.write_local_mtz(mtz_path=mtz_path)
+
+ assert os.path.exists(mtz_path), "Local mtz file not created"
+
+ with zipfile.ZipFile(mtz_path) as local_mtz:
+ assert all(
+ file.file_size > 0 for file in local_mtz.infolist()
+ ), "Empty files in local mtz"
diff --git a/tests/test_mtz.py b/tests/test_mtz.py
new file mode 100644
index 0000000..7e4c3fb
--- /dev/null
+++ b/tests/test_mtz.py
@@ -0,0 +1,97 @@
+import re
+
+import pytest
+
+from maltego_trx.mtz import (
+ create_local_server_xml,
+ create_settings_xml,
+ create_transform_xml,
+ create_transform_set_xml,
+)
+from tests.test_xml import _serialize_xml
+
+
+def test_create_local_server_xml(mocker, snapshot):
+ mocker.patch(
+ "maltego_trx.mtz.create_last_sync_timestamp",
+ return_value="2022-08-10 07:52:45 UTC",
+ )
+
+ transforms = ["to_lower", "to_upper", "to_title"]
+
+ server_xml = create_local_server_xml(transforms)
+ server_xml_str = _serialize_xml(server_xml)
+
+ assert server_xml_str == snapshot
+
+
+@pytest.mark.parametrize(
+ "kwargs",
+ [
+ {
+ "working_dir": ".",
+ "command": "python3",
+ "params": "project.py",
+ "debug": True,
+ },
+ {
+ "working_dir": "~/project/maltego",
+ "command": "venv/bin/python3",
+ "params": "main.py",
+ "debug": False,
+ },
+ ],
+)
+def test_create_settings_xml(kwargs, snapshot):
+ settings_xml = create_settings_xml(**kwargs)
+ settings_xml_str = _serialize_xml(settings_xml)
+
+ assert settings_xml_str == snapshot
+
+
+@pytest.mark.parametrize(
+ "kwargs",
+ [
+ {
+ "name": "to_lower",
+ "display_name": "To Lower",
+ "description": "Converts the input to lowercase",
+ "input_entity": "maltego.Phrase",
+ "author": "Maltego Team",
+ },
+ {
+ "name": "to_upper",
+ "display_name": "To Upper",
+ "description": "Converts the input to uppercase",
+ "input_entity": "maltego.Text",
+ "author": "Maltego Organization",
+ },
+ ],
+)
+def test_create_transform_xml(kwargs, snapshot):
+ transform_xml = create_transform_xml(**kwargs)
+ transform_xml_str = _serialize_xml(transform_xml)
+
+ assert transform_xml_str == snapshot
+
+
+@pytest.mark.parametrize(
+ "kwargs",
+ [
+ {
+ "name": "text_transforms",
+ "description": "Basic text transforms",
+ "transforms": ["to_lower", "to_upper", "to_title"],
+ },
+ {
+ "name": "name_transforms",
+ "description": "Name Transforms",
+ "transforms": ["remove_lastname", "expand_middle_name", "to_initials"],
+ },
+ ],
+)
+def test_create_transform_set_xml(kwargs, snapshot):
+ transform_set_xml = create_transform_set_xml(**kwargs)
+ transform_set_xml_str = _serialize_xml(transform_set_xml)
+
+ assert transform_set_xml_str == snapshot
diff --git a/tests/test_registry.py b/tests/test_registry.py
deleted file mode 100644
index 2200e97..0000000
--- a/tests/test_registry.py
+++ /dev/null
@@ -1,165 +0,0 @@
-import os
-import random
-from typing import NamedTuple, List
-
-import petname
-import pytest as pytest
-
-from maltego_trx.decorator_registry import TransformSetting, TransformRegistry, TRANSFORMS_CSV_HEADER, \
- SETTINGS_CSV_HEADER
-from maltego_trx.server import app
-from maltego_trx.utils import name_to_path, serialize_bool
-
-
-@pytest.fixture
-def client():
- with app.test_client() as client:
- yield client
-
-
-@pytest.fixture
-def registry():
- registry: TransformRegistry = TransformRegistry(owner="Maltego Technologies GmbH",
- author="Maltego Support",
- host_url="localhost",
- seed_ids=["demo"])
- return registry
-
-
-def make_transform_setting():
- name = petname.generate()
- setting_type = random.choice(['string', 'boolean', 'date', 'datetime', 'daterange', 'url', 'double', 'int'])
-
- return TransformSetting(name=name,
- display_name=name.title(),
- setting_type=random.choice(setting_type),
- default_value=petname.generate(),
- optional=random.choice([True, False]),
- popup=random.choice([True, False]),
- global_setting=random.choice([True, False]))
-
-
-def make_transform(registry: TransformRegistry, settings: List[TransformSetting] = None):
- display_name = petname.generate(separator=" ")
- input_entity = petname.generate(separator=".")
- description = petname.generate(words=10, separator=" ").title() + "."
- settings = settings or [make_transform_setting(), make_transform_setting()]
- output_entities = petname.generate(3).split("-")
- disclaimer = petname.generate(words=10, separator=" ").title() + "."
-
- @registry.register_transform(display_name, input_entity, description, settings, output_entities, disclaimer)
- class TestClass:
- pass
-
- return TestClass
-
-
-def test_register_transform_decorator(registry):
- test_settings = [make_transform_setting(), make_transform_setting()]
-
- display_name = petname.generate(separator=" ")
- input_entity = petname.generate(separator=".")
- description = petname.generate(words=10, separator=" ").title() + "."
- output_entities = petname.generate(3).split("-")
- disclaimer = petname.generate(words=10, separator=" ").title() + "."
-
- @registry.register_transform(display_name, input_entity, description, test_settings, output_entities, disclaimer)
- class TestClass:
- pass
-
- path_name = name_to_path(TestClass.__name__)
-
- tx_meta = registry.transform_metas.get(path_name)
-
- assert tx_meta
- assert tx_meta.display_name == display_name
- assert tx_meta.input_entity == input_entity
- assert tx_meta.description == description
- assert tx_meta.disclaimer == disclaimer
-
- assert test_settings == registry.transform_settings[path_name]
-
-
-class TransformCsvLine(NamedTuple):
- owner: str
- author: str
- disclaimer: str
- description: str
- version: str
- name: str
- display_name: str
- host: str
- input_entity: str
- oauth_id: str
- settings_ids: str
- seed_ids: str
-
-
-class SettingCsvLine(NamedTuple):
- name: str
- setting_type: str
- display_name: str
- default: str
- optional: str
- popup: str
-
-
-def test_transform_to_csv(registry):
- random_class = make_transform(registry)
-
- path_name = name_to_path(random_class.__name__)
-
- tx_meta = registry.transform_metas.get(path_name)
- tx_settings = registry.transform_settings.get(path_name, [])
-
- registry.write_transforms_config()
-
- with open("./transforms.csv") as transforms_csv:
- header = next(transforms_csv)
- assert header.rstrip("\n") == TRANSFORMS_CSV_HEADER
-
- line = next(transforms_csv).rstrip("\n")
- data: TransformCsvLine = TransformCsvLine(*line.split(','))
-
- assert data.owner == registry.owner
- assert data.author == registry.author
- assert data.disclaimer == tx_meta.disclaimer
- assert data.description == tx_meta.description
- assert data.version == registry.version
- assert data.name == tx_meta.class_name
- assert data.display_name == tx_meta.display_name
- assert data.host == os.path.join(registry.host_url, "run", path_name)
- assert data.input_entity == tx_meta.input_entity
- assert data.oauth_id == registry.oauth_settings_id
- assert data.settings_ids.split(";") == [s.id for s in tx_settings]
- assert data.seed_ids.split(";") == registry.seed_ids
-
-
-def test_setting_to_csv(registry):
- local_setting = make_transform_setting()
- local_setting.global_setting = False
-
- global_setting = make_transform_setting()
- global_setting.global_setting = True
-
- registry.global_settings.append(global_setting)
-
- @registry.register_transform("", "", "", settings=[local_setting])
- class TestClass:
- pass
-
- registry.write_settings_config()
- with open("./settings.csv") as settings_csv:
- header = next(settings_csv)
- assert header.rstrip("\n") == SETTINGS_CSV_HEADER
-
- for line, setting in zip(settings_csv.readlines(), [global_setting, local_setting]):
- line = line.rstrip("\n")
- data: SettingCsvLine = SettingCsvLine(*line.split(','))
-
- assert data.name == setting.id
- assert data.setting_type == setting.setting_type
- assert data.display_name == setting.display_name
- assert data.default == setting.default_value
- assert data.optional == serialize_bool(setting.optional, 'True', 'False')
- assert data.popup == serialize_bool(setting.popup, 'Yes', 'No')