From 5a087dc1ed269203f747e8fb3496f1f72f1a093d Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Sep 2024 18:50:50 +0200 Subject: [PATCH] fixup! fixup! Issue #150/#156 integrate DeepGraphSplitter in AggregatorBatchJobs._create_crossbackend_job --- src/openeo_aggregator/testing.py | 4 +- tests/partitionedjobs/test_api.py | 149 +++++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 3 deletions(-) diff --git a/src/openeo_aggregator/testing.py b/src/openeo_aggregator/testing.py index 8ec90de..9f044b8 100644 --- a/src/openeo_aggregator/testing.py +++ b/src/openeo_aggregator/testing.py @@ -66,13 +66,13 @@ def exists(self, path): def get(self, path): self._assert_open() if path not in self.data: - raise kazoo.exceptions.NoNodeError() + raise kazoo.exceptions.NoNodeError(path) return self.data[path] def get_children(self, path): self._assert_open() if path not in self.data: - raise kazoo.exceptions.NoNodeError() + raise kazoo.exceptions.NoNodeError(path) parent = path.split("/") return [p.split("/")[-1] for p in self.data if p.split("/")[:-1] == parent] diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index 826eb5e..956cae6 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -697,7 +697,10 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1, split_strateg """Handling of single "load_collection" process graph""" api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) - pg = {"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True}} + pg = { + # lc1 (that's it, that's the graph) + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True} + } res = api100.post( "/jobs", @@ -773,6 +776,9 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, dummy2, reques api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) pg = { + # lc1 lc2 + # \ / + # merge "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { @@ -913,6 +919,9 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, dummy2, r api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) pg = { + # lc1 lc2 + # \ / + # merge "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { @@ -999,6 +1008,9 @@ def test_failing_create(self, flask_app, api100, zk_db, dummy1, dummy2, split_st dummy2.fail_create_job = True pg = { + # lc1 lc2 + # \ / + # merge "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { @@ -1028,3 +1040,138 @@ def test_failing_create(self, flask_app, api100, zk_db, dummy1, dummy2, split_st "created": self.now.rfc3339, "progress": 0, } + + @now.mock + def test_create_job_deep_basic(self, flask_app, api100, zk_db, dummy1, dummy2, requests_mock): + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + pg = { + # lc1 lc2 + # | | + # bands1 temporal2 + # \ / + # merge + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, + "bands1": {"process_id": "filter_bands", "arguments": {"data": {"from_node": "lc1"}}}, + "temporal2": {"process_id": "filter_temporal", "arguments": {"data": {"from_node": "lc2"}}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}}, + "result": True, + }, + } + + requests_mock.get( + "https://b2.test/v1/jobs/2-jb-0/results?partial=true", + json={"links": [{"rel": "canonical", "href": "https://data.b2.test/123abc"}]}, + ) + + split_strategy = {"crossbackend": {"method": "deep", "primary_backend": "b1"}} + res = api100.post( + "/jobs", + json={ + "process": {"process_graph": pg}, + "job_options": {"split_strategy": split_strategy}, + }, + ).assert_status_code(201) + + pjob_id = "pj-20220119-123456" + expected_job_id = f"agg-{pjob_id}" + assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}" + assert res.headers["OpenEO-Identifier"] == expected_job_id + + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == { + "id": expected_job_id, + "process": {"process_graph": pg}, + "status": "created", + "created": self.now.rfc3339, + "progress": 0, + } + + # Inspect stored parent job metadata + assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == { + "user_id": TEST_USER, + "created": self.now.epoch, + "process": {"process_graph": pg}, + "metadata": {"log_level": "info"}, + "job_options": {"split_strategy": split_strategy}, + "result_jobs": ["main"], + } + + assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == { + "status": "created", + "message": approx_str_contains("{'created': 2}"), + "timestamp": pytest.approx(self.now.epoch, abs=5), + "progress": 0, + } + + # Inspect stored subjob metadata + subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id) + assert subjobs == { + "b2:temporal2": { + "backend_id": "b2", + "process_graph": { + "lc2": {"arguments": {"id": "T22"}, "process_id": "load_collection"}, + "temporal2": {"arguments": {"data": {"from_node": "lc2"}}, "process_id": "filter_temporal"}, + "_agg_crossbackend_save_result": { + "arguments": {"data": {"from_node": "temporal2"}, "format": "GTiff"}, + "process_id": "save_result", + "result": True, + }, + }, + "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b2:temporal2'", + }, + "main": { + "backend_id": "b1", + "process_graph": { + "lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection"}, + "bands1": {"arguments": {"data": {"from_node": "lc1"}}, "process_id": "filter_bands"}, + "temporal2": {"arguments": {"url": "https://data.b2.test/123abc"}, "process_id": "load_stac"}, + "merge": { + "arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}}, + "process_id": "merge_cubes", + "result": True, + }, + }, + "title": "Partitioned job pjob_id='pj-20220119-123456' " "sjob_id='main'", + }, + } + sjob_id = "main" + expected_job_id = "1-jb-0" + assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { + "status": "created", + "timestamp": self.now.epoch, + "message": None, + } + assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id + assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created" + assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { + "lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection"}, + "bands1": {"arguments": {"data": {"from_node": "lc1"}}, "process_id": "filter_bands"}, + "temporal2": {"arguments": {"url": "https://data.b2.test/123abc"}, "process_id": "load_stac"}, + "merge": { + "arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}}, + "process_id": "merge_cubes", + "result": True, + }, + } + sjob_id = "b2:temporal2" + expected_job_id = "2-jb-0" + assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { + "status": "created", + "timestamp": self.now.epoch, + "message": None, + } + assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id + assert dummy2.get_job_status(TEST_USER, expected_job_id) == "created" + assert dummy2.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { + "lc2": {"arguments": {"id": "T22"}, "process_id": "load_collection"}, + "temporal2": {"arguments": {"data": {"from_node": "lc2"}}, "process_id": "filter_temporal"}, + "_agg_crossbackend_save_result": { + "arguments": {"data": {"from_node": "temporal2"}, "format": "GTiff"}, + "process_id": "save_result", + "result": True, + }, + }