static boolean isPriorFlowExecutionRunning()

in gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java [213:278]


  static boolean isPriorFlowExecutionRunning(String flowGroup, String flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
      throws IOException {
    List<FlowStatus> flowStatusList = dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);

    if (flowStatusList == null || flowStatusList.isEmpty()) {
      return false;
    }

    for (FlowStatus flowStatus : flowStatusList) {
      ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
        // a duplicate call to this method indicate that the prior caller of this method could not complete the required action,
        // so we ignore any flow status for the current execution to give the caller another chance to complete them
        // but this should be rate, so lets log it
        if (flowExecutionStatus == COMPILED) {
          log.info("A previous execution with the same flowExecutionId found {}. Previous execution may not be "
              + "successfully submitted.", flowStatus);
        } else if (flowExecutionStatus == RUNNING) {
          log.error("A previous execution with the same flowExecutionId found {}. This is a rare case of previous "
              + "execution getting submitted but then LaunchDagProc failed to complete the lease", flowStatus);
        } else {
          log.warn("A previous execution with the same flowExecutionId and an unexpected status is found {}.", flowStatus);
        }
        continue;
      }

      log.debug("Verifying if {} is running...", flowStatus);

      if (FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name()) || flowExecutionStatus == $UNKNOWN) {
        // ignore finished entries
        // todo - make changes so `getAllFlowStatusesForFlow` never returns $UNKNOWN flow status
      } else if (flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING
          || flowExecutionStatus == PENDING_RESUME || flowExecutionStatus == RUNNING) {
        // these are the only four non-terminal statuses that a flow can have. jobs have two more non-terminal statuses
        // ORCHESTRATED and PENDING_RETRY
        Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName, flowStatus.getFlowExecutionId());
        java.util.Optional<Dag<JobExecutionPlan>> dag = dagManagementStateStore.getDag(dagIdOfOldExecution);

        if (!dag.isPresent()) {
          log.error("Dag is finished and cleaned up, job status monitor somehow did not receive/update the flow status. Ignoring it here...");
          continue;
        }

        Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
        long flowStartTime = DagUtils.getFlowStartTime(dagNode);
        long jobStartDeadline =
            DagUtils.getJobStartDeadline(dagNode, DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
        long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
          if ((flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING)
                && System.currentTimeMillis() < flowStartTime + jobStartDeadline
              || (flowExecutionStatus == RUNNING || flowExecutionStatus == PENDING_RESUME)
                && System.currentTimeMillis() < flowStartTime + flowFinishDeadline) {
            log.info("{} is still running. Found a dag for this, flowStartTime {}, jobStartDeadline {}, flowFinishDeadline {}",
                flowStatus, flowStartTime, jobStartDeadline, flowFinishDeadline);
            return true;
          } else {
            log.warn("Dag {} is still running beyond deadline! flowStartTime {}, jobStartDeadline {}, flowFinishDeadline {}",
                dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
          }
        } else {
          log.error("Unknown status {}", flowExecutionStatus);
        }
      }

    return false;
  }