diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java index 0bcdf569430..779943fc3d4 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java @@ -220,13 +220,27 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus, jobStatusArray.sort(Comparator.comparing((org.apache.gobblin.service.JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime())); + // $UNKNOWN is a Pegasus in-memory sentinel (not declared in ExecutionStatus.pdl) that arises + // when no flow-level (NA/NA) status event was persisted for an execution. This can happen when + // the FlowSucceeded/FlowFailed GobblinTrackingEvent is emitted by the orchestrator but + // KafkaAvroJobStatusMonitor fails to persist it to the state store (e.g., Kafka consumer + // shard-rebalance / checkpoint issues). Serializing $UNKNOWN produces HTTP 500 and poisons the + // whole collection response. Coerce to PENDING so the record serializes; polling callers will + // keep polling until a terminal state becomes known. + ExecutionStatus flowExecutionStatus = monitoringFlowStatus.getFlowExecutionStatus(); + if (flowExecutionStatus == ExecutionStatus.$UNKNOWN) { + log.warn("FlowExecution {}/{}/{} has $UNKNOWN flow status; coercing to PENDING. Check state store for data quality issue.", + flowId.getFlowGroup(), flowId.getFlowName(), monitoringFlowStatus.getFlowExecutionId()); + flowExecutionStatus = ExecutionStatus.PENDING; + } + return new FlowExecution() .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName()) .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId())) .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus)) .setExecutionEndTime(flowEndTime)) .setMessage(flowMessage) - .setExecutionStatus(monitoringFlowStatus.getFlowExecutionStatus()) + .setExecutionStatus(flowExecutionStatus) .setJobStatuses(jobStatusArray) .setIssues(new IssueArray(flowIssues)); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java index 0333a167b73..f71094bf152 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java @@ -17,12 +17,38 @@ package org.apache.gobblin.service; +import java.util.Collections; + import org.testng.Assert; import org.testng.annotations.Test; +import org.apache.gobblin.service.monitoring.FlowStatus; + public class FlowExecutionResourceHandlerTest { + @Test + public void testConvertFlowStatusCoercesUnknownToPending() { + FlowStatus unknownFlowStatus = new FlowStatus("flowName", "flowGroup", 123L, + Collections.emptyIterator(), ExecutionStatus.$UNKNOWN); + + FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(unknownFlowStatus, false); + + Assert.assertNotNull(flowExecution); + Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.PENDING); + } + + @Test + public void testConvertFlowStatusPreservesValidStatus() { + FlowStatus runningFlowStatus = new FlowStatus("flowName", "flowGroup", 123L, + Collections.emptyIterator(), ExecutionStatus.RUNNING); + + FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(runningFlowStatus, false); + + Assert.assertNotNull(flowExecution); + Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.RUNNING); + } + @Test public void testEstimateCopyTimeLeftSanityCheck() { long currentTime = 10000;