Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,27 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus,

jobStatusArray.sort(Comparator.comparing((org.apache.gobblin.service.JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime()));

// $UNKNOWN is a Pegasus in-memory sentinel (not declared in ExecutionStatus.pdl) that arises
// when no flow-level (NA/NA) status event was persisted for an execution. This can happen when
// the FlowSucceeded/FlowFailed GobblinTrackingEvent is emitted by the orchestrator but
// KafkaAvroJobStatusMonitor fails to persist it to the state store (e.g., Kafka consumer
// shard-rebalance / checkpoint issues). Serializing $UNKNOWN produces HTTP 500 and poisons the
// whole collection response. Coerce to PENDING so the record serializes; polling callers will
// keep polling until a terminal state becomes known.
ExecutionStatus flowExecutionStatus = monitoringFlowStatus.getFlowExecutionStatus();
if (flowExecutionStatus == ExecutionStatus.$UNKNOWN) {
log.warn("FlowExecution {}/{}/{} has $UNKNOWN flow status; coercing to PENDING. Check state store for data quality issue.",
flowId.getFlowGroup(), flowId.getFlowName(), monitoringFlowStatus.getFlowExecutionId());
flowExecutionStatus = ExecutionStatus.PENDING;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why PENDING over RUNNING as the coercion target? RUNNING might be a closer semantic match than PENDING, since the flow is clearly active enough for executions to appear in the collection response.

}

return new FlowExecution()
.setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
.setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
.setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
.setExecutionEndTime(flowEndTime))
.setMessage(flowMessage)
.setExecutionStatus(monitoringFlowStatus.getFlowExecutionStatus())
.setExecutionStatus(flowExecutionStatus)
.setJobStatuses(jobStatusArray)
.setIssues(new IssueArray(flowIssues));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@

package org.apache.gobblin.service;

import java.util.Collections;

import org.testng.Assert;
import org.testng.annotations.Test;

import org.apache.gobblin.service.monitoring.FlowStatus;


public class FlowExecutionResourceHandlerTest {

@Test
public void testConvertFlowStatusCoercesUnknownToPending() {
FlowStatus unknownFlowStatus = new FlowStatus("flowName", "flowGroup", 123L,
Collections.emptyIterator(), ExecutionStatus.$UNKNOWN);

FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(unknownFlowStatus, false);

Assert.assertNotNull(flowExecution);
Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.PENDING);
}

@Test
public void testConvertFlowStatusPreservesValidStatus() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a terminal-status test case to testConvertFlowStatusPreservesValidStatus?

FlowStatus runningFlowStatus = new FlowStatus("flowName", "flowGroup", 123L,
Collections.emptyIterator(), ExecutionStatus.RUNNING);

FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(runningFlowStatus, false);

Assert.assertNotNull(flowExecution);
Assert.assertEquals(flowExecution.getExecutionStatus(), ExecutionStatus.RUNNING);
}

@Test
public void testEstimateCopyTimeLeftSanityCheck() {
long currentTime = 10000;
Expand Down