diff --git a/openeogeotrellis/load_stac.py b/openeogeotrellis/load_stac.py index 52c7b974..ef6db7ee 100644 --- a/openeogeotrellis/load_stac.py +++ b/openeogeotrellis/load_stac.py @@ -137,13 +137,41 @@ def operator_value(criterion: Dict[str, object]) -> (str, object): collection = None - # TODO: `user` might be None - dependency_job_info = (extract_own_job_info(url, user_id=user.user_id, batch_jobs=batch_jobs) if batch_jobs - else None) + backend_config = get_backend_config() + max_poll_time = time.time() + backend_config.job_dependencies_max_poll_delay_seconds + + def get_dependency_job_info() -> Optional[BatchJobMetadata]: + # TODO: `user` might be None + return (extract_own_job_info(url, user_id=user.user_id, batch_jobs=batch_jobs) if batch_jobs + else None) + + dependency_job_info = get_dependency_job_info() if dependency_job_info: - # TODO: handle polling own job results as well (~ GpsBatchJobs.poll_job_dependencies) logger.info(f"load_stac of results of own job {dependency_job_info.id}") + + while True: + partial_job_status = PARTIAL_JOB_STATUS.for_job_status(dependency_job_info.status) + + logger.debug(f"OpenEO batch job results status of own job {dependency_job_info.id}: {partial_job_status}") + + if partial_job_status in [PARTIAL_JOB_STATUS.ERROR, PARTIAL_JOB_STATUS.CANCELED]: + logger.error(f"Failing because own OpenEO batch job {dependency_job_info.id} failed") + elif partial_job_status in [None, PARTIAL_JOB_STATUS.FINISHED]: + break # not a partial job result or success: proceed + + # still running: continue polling + if time.time() >= max_poll_time: + max_poll_delay_reached_error = (f"OpenEO batch job results dependency of" + f"own job {dependency_job_info.id} was not satisfied after" + f" {backend_config.job_dependencies_max_poll_delay_seconds} s, aborting") + + raise Exception(max_poll_delay_reached_error) + + time.sleep(backend_config.job_dependencies_poll_interval_seconds) + + dependency_job_info = get_dependency_job_info() + intersecting_items = [] for asset_id, asset in batch_jobs.get_result_assets(job_id=dependency_job_info.id, @@ -185,22 +213,19 @@ def operator_value(criterion: Dict[str, object]) -> (str, object): logger.info(f"load_stac of arbitrary URL {url}") # TODO: move this polling logic to a dedicated method - backend_config = get_backend_config() - max_poll_time = time.time() + backend_config.job_dependencies_max_poll_delay_seconds - while True: # TODO: add retry stac_object = pystac.read_file(href=url) # TODO: does this have a timeout set? - job_results_status = (stac_object + partial_job_status = (stac_object .to_dict(include_self_link=False, transform_hrefs=False) .get('openeo:status')) - logger.debug(f"OpenEO batch job results status for {url}: {job_results_status}") + logger.debug(f"OpenEO batch job results status of {url}: {partial_job_status}") - if job_results_status in [PARTIAL_JOB_STATUS.ERROR, PARTIAL_JOB_STATUS.CANCELED]: + if partial_job_status in [PARTIAL_JOB_STATUS.ERROR, PARTIAL_JOB_STATUS.CANCELED]: logger.error(f"Failing because OpenEO batch job with results at {url} failed") - elif job_results_status in [None, PARTIAL_JOB_STATUS.FINISHED]: + elif partial_job_status in [None, PARTIAL_JOB_STATUS.FINISHED]: break # not a partial job result or success: proceed # still running: continue polling diff --git a/tests/test_api_result.py b/tests/test_api_result.py index d3299f74..55c57c03 100644 --- a/tests/test_api_result.py +++ b/tests/test_api_result.py @@ -4114,10 +4114,10 @@ def item_json(path): api110.result(process_graph).assert_status_code(200) - assert ("OpenEO batch job results status for" + assert ("OpenEO batch job results status of" " https://openeo.test/openeo/jobs/j-2402094545c945c09e1307503aa58a3a/results?partial=true: running" in caplog.messages) - assert ("OpenEO batch job results status for" + assert ("OpenEO batch job results status of" " https://openeo.test/openeo/jobs/j-2402094545c945c09e1307503aa58a3a/results?partial=true: finished" in caplog.messages)