in gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java [213:278]
static boolean isPriorFlowExecutionRunning(String flowGroup, String flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
throws IOException {
List<FlowStatus> flowStatusList = dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
}
for (FlowStatus flowStatus : flowStatusList) {
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
if (flowStatus.getFlowExecutionId() == flowExecutionId) {
// a duplicate call to this method indicate that the prior caller of this method could not complete the required action,
// so we ignore any flow status for the current execution to give the caller another chance to complete them
// but this should be rate, so lets log it
if (flowExecutionStatus == COMPILED) {
log.info("A previous execution with the same flowExecutionId found {}. Previous execution may not be "
+ "successfully submitted.", flowStatus);
} else if (flowExecutionStatus == RUNNING) {
log.error("A previous execution with the same flowExecutionId found {}. This is a rare case of previous "
+ "execution getting submitted but then LaunchDagProc failed to complete the lease", flowStatus);
} else {
log.warn("A previous execution with the same flowExecutionId and an unexpected status is found {}.", flowStatus);
}
continue;
}
log.debug("Verifying if {} is running...", flowStatus);
if (FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name()) || flowExecutionStatus == $UNKNOWN) {
// ignore finished entries
// todo - make changes so `getAllFlowStatusesForFlow` never returns $UNKNOWN flow status
} else if (flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING
|| flowExecutionStatus == PENDING_RESUME || flowExecutionStatus == RUNNING) {
// these are the only four non-terminal statuses that a flow can have. jobs have two more non-terminal statuses
// ORCHESTRATED and PENDING_RETRY
Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName, flowStatus.getFlowExecutionId());
java.util.Optional<Dag<JobExecutionPlan>> dag = dagManagementStateStore.getDag(dagIdOfOldExecution);
if (!dag.isPresent()) {
log.error("Dag is finished and cleaned up, job status monitor somehow did not receive/update the flow status. Ignoring it here...");
continue;
}
Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
long flowStartTime = DagUtils.getFlowStartTime(dagNode);
long jobStartDeadline =
DagUtils.getJobStartDeadline(dagNode, DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
if ((flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING)
&& System.currentTimeMillis() < flowStartTime + jobStartDeadline
|| (flowExecutionStatus == RUNNING || flowExecutionStatus == PENDING_RESUME)
&& System.currentTimeMillis() < flowStartTime + flowFinishDeadline) {
log.info("{} is still running. Found a dag for this, flowStartTime {}, jobStartDeadline {}, flowFinishDeadline {}",
flowStatus, flowStartTime, jobStartDeadline, flowFinishDeadline);
return true;
} else {
log.warn("Dag {} is still running beyond deadline! flowStartTime {}, jobStartDeadline {}, flowFinishDeadline {}",
dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
}
} else {
log.error("Unknown status {}", flowExecutionStatus);
}
}
return false;
}