From 08fff94d5b673833f4c5049ac1ba923738488996 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 29 Apr 2026 16:10:20 +0200 Subject: [PATCH] provide pg/tracer to log added value from https://github.com/Open-EO/openeo-geopyspark-driver/issues/1436 --- openeo_driver/backend.py | 5 ++++- openeo_driver/processgraph/registry.py | 10 +++++----- openeo_driver/views.py | 19 ++++++++++++++++--- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index f6c805eb..0cc49b2c 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -19,6 +19,7 @@ from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container, Any, Tuple import flask +from openeo_driver.processgraph.definitions import ProcessGraphFlatDict import openeo_driver.util.view_helpers from openeo.utils.version import ComparableVersion @@ -786,7 +787,7 @@ class Processing(MicroService): def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> ProcessRegistry: raise NotImplementedError - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): """Evaluate given process graph (flat dict format).""" raise NotImplementedError @@ -1060,9 +1061,11 @@ def request_costs( self, *, user: User, + process_graph: ProcessGraphFlatDict, job_options: Union[dict, None] = None, request_id: str, success: bool, + tracer: DryRunDataTracer, ) -> Optional[float]: """ Report resource usage of (current) synchronous processing request and get associated cost. diff --git a/openeo_driver/processgraph/registry.py b/openeo_driver/processgraph/registry.py index 24819bea..556a76ba 100644 --- a/openeo_driver/processgraph/registry.py +++ b/openeo_driver/processgraph/registry.py @@ -13,6 +13,7 @@ from openeo.utils.version import ComparableVersion from openeo_driver.backend import OpenEoBackendImplementation, Processing +from openeo_driver.dry_run import DryRunDataTracer from openeo_driver.errors import OpenEOApiException from openeo_driver.processes import DEFAULT_NAMESPACE, ProcessArgs, ProcessRegistry, ProcessSpec from openeo_driver.specs import SPECS_ROOT @@ -238,7 +239,7 @@ def get_basic_env(self, api_version: str = OPENEO_API_VERSION_DEFAULT) -> EvalEn } ) - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): from openeo_driver.processgraph.evaluator import evaluate return evaluate(process_graph=process_graph, env=env or self.get_basic_env(), do_dry_run=False) @@ -257,9 +258,10 @@ def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> Pr else: raise OpenEOApiException(message=f"No process support for openEO version {api_version}") - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): from openeo_driver.processgraph.evaluator import evaluate - return evaluate(process_graph=process_graph, env=env) + + return evaluate(process_graph=process_graph, env=env, do_dry_run=do_dry_run) def validate(self, process_graph: dict, env: EvalEnv = None): from openeo_driver.processgraph.evaluator import evaluate, _collect_end_nodes, convert_node @@ -304,5 +306,3 @@ def validate(self, process_graph: dict, env: EvalEnv = None): def extra_validation(self, process_graph, env, result, source_constraints): return [] - - diff --git a/openeo_driver/views.py b/openeo_driver/views.py index 8a97f0ca..fe34ef07 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -55,6 +55,7 @@ LINK_REL, ) from openeo_driver.datacube import DriverMlModel +from openeo_driver.dry_run import DryRunDataTracer from openeo_driver.errors import ( FeatureUnsupportedException, FilePathInvalidException, @@ -725,8 +726,10 @@ def result(user: User): } ) + tracer = DryRunDataTracer() + try: - result = backend_implementation.processing.evaluate(process_graph=process_graph, env=env) + result = backend_implementation.processing.evaluate(process_graph=process_graph, env=env, do_dry_run=tracer) _log.info(f"`POST /result`: {type(result)}") if result is None: @@ -743,7 +746,12 @@ def result(user: User): response = result.create_flask_response() costs = backend_implementation.request_costs( - success=True, user=user, request_id=request_id, job_options=job_options + success=True, + user=user, + process_graph=process_graph, + request_id=request_id, + job_options=job_options, + tracer=tracer, ) if costs: # TODO not all costs are accounted for so don't expose in "OpenEO-Costs" yet @@ -752,7 +760,12 @@ def result(user: User): except Exception: # TODO: also send "OpenEO-Costs" header on failure backend_implementation.request_costs( - success=False, user=user, request_id=request_id, job_options=job_options + success=False, + user=user, + process_graph=process_graph, + request_id=request_id, + job_options=job_options, + tracer=tracer, ) raise