def doExecute()

in measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala [42:88]


  def doExecute(context: DQContext): Try[Boolean]

  def execute(context: DQContext): Try[Boolean] = {

    val threadName = Thread.currentThread().getName
    info(threadName + " begin transform step : \n" + debugString())

    // Submit parents Steps
    val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
      Future {
        val result = parentStep.execute(context)
        parentStep.synchronized {
          result match {
            case Success(_) => parentStep.status = COMPLETE
            case Failure(_) => parentStep.status = FAILED
          }
        }
        result
      }(TransformStep.transformStepContext)
    }

    val parentsResultSet = ThreadUtils.awaitResult(
      Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
      Duration.Inf)

    val parentsResult = parentsResultSet.foldLeft(Try(true)) { (ret, step) =>
      (ret, step) match {
        case (Success(_), nextResult) => nextResult
        case (Failure(_), _) => ret
      }
    }

    parentSteps.foreach(step => {
      while (step.status == RUNNING) {
        Thread.sleep(1000L)
      }
    })

    parentsResult match {
      case Success(_) =>
        info(threadName + " end transform step : \n" + debugString())
        doExecute(context)
      case Failure(e) =>
        error("Parent transform step failed: \n" + debugString(), e)
        parentsResult
    }
  }