From 4a55a6753f3cf21b57c0e51be1fc4d8e6ced8a24 Mon Sep 17 00:00:00 2001 From: Daisy Modi Date: Mon, 20 Apr 2026 19:39:28 +0530 Subject: [PATCH] [GOBBLIN-2259] Coerce $UNKNOWN ExecutionStatus to PENDING in FlowExecutionResource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ExecutionStatus.$UNKNOWN is a Pegasus in-memory sentinel, not a symbol declared in ExecutionStatus.pdl. When it reaches the response, Rest.li rejects it during serialization and returns HTTP 500 for the entire collection — a single poisoned record takes down every execution in the same latestFlowExecution batch. This can happen when no flow-level (NA/NA) status event was persisted for an execution, e.g., because ReevaluateDagProc early-returned without emitting the flow-level event while a concurrent DagProc cleaned up the Dag. The result is a "zombie" execution: job-level terminal rows exist in the state store, but no NA row, so JobStatusRetriever.getFlowStatusFromJobStatuses returns its $UNKNOWN default. Guard at the REST boundary: coerce $UNKNOWN to PENDING before building the FlowExecution response, and log a WARN so the underlying data-quality issue stays observable. PENDING is the least-misleading valid enum value for "flow exists but terminal state unknown" — polling clients continue polling, so we never falsely report a terminal status we cannot verify. Added tests: - testConvertFlowStatusCoercesUnknownToPending - testConvertFlowStatusPreservesValidStatus --- .../service/FlowExecutionResource.java | 16 +++++++++++- .../FlowExecutionResourceHandlerTest.java | 26 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java index 0bcdf569430..779943fc3d4 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java @@ -220,13 +220,27 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus, jobStatusArray.sort(Comparator.comparing((org.apache.gobblin.service.JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime())); + // $UNKNOWN is a Pegasus in-memory sentinel (not declared in ExecutionStatus.pdl) that arises + // when no flow-level (NA/NA) status event was persisted for an execution. This can happen when + // the FlowSucceeded/FlowFailed GobblinTrackingEvent is emitted by the orchestrator but + // KafkaAvroJobStatusMonitor fails to persist it to the state store (e.g., Kafka consumer + // shard-rebalance / checkpoint issues). Serializing $UNKNOWN produces HTTP 500 and poisons the + // whole collection response. Coerce to PENDING so the record serializes; polling callers will + // keep polling until a terminal state becomes known. + ExecutionStatus flowExecutionStatus = monitoringFlowStatus.getFlowExecutionStatus(); + if (flowExecutionStatus == ExecutionStatus.$UNKNOWN) { + log.warn("FlowExecution {}/{}/{} has $UNKNOWN flow status; coercing to PENDING. Check state store for data quality issue.", + flowId.getFlowGroup(), flowId.getFlowName(), monitoringFlowStatus.getFlowExecutionId()); + flowExecutionStatus = ExecutionStatus.PENDING; + } + return new FlowExecution() .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName()) .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId())) .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus)) .setExecutionEndTime(flowEndTime)) .setMessage(flowMessage) - .setExecutionStatus(monitoringFlowStatus.getFlowExecutionStatus()) + .setExecutionStatus(flowExecutionStatus) .setJobStatuses(jobStatusArray) .setIssues(new IssueArray(flowIssues)); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java index 0333a167b73..f71094bf152 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java @@ -17,12 +17,38 @@ package org.apache.gobblin.service; +import java.util.Collections; + import org.testng.Assert; import org.testng.annotations.Test; +import org.apache.gobblin.service.monitoring.FlowStatus; + public class FlowExecutionResourceHandlerTest { + @Test + public void testConvertFlowStatusCoercesUnknownToPending() { + FlowStatus unknownFlowStatus = new FlowStatus("flowName", "flowGroup", 123L, + Collections.emptyIterator(), ExecutionStatus.$UNKNOWN); + + FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(unknownFlowStatus, false); + + Assert.assertNotNull(flowExecution); + Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.PENDING); + } + + @Test + public void testConvertFlowStatusPreservesValidStatus() { + FlowStatus runningFlowStatus = new FlowStatus("flowName", "flowGroup", 123L, + Collections.emptyIterator(), ExecutionStatus.RUNNING); + + FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(runningFlowStatus, false); + + Assert.assertNotNull(flowExecution); + Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.RUNNING); + } + @Test public void testEstimateCopyTimeLeftSanityCheck() { long currentTime = 10000;