in measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala [42:88]
def doExecute(context: DQContext): Try[Boolean]
def execute(context: DQContext): Try[Boolean] = {
val threadName = Thread.currentThread().getName
info(threadName + " begin transform step : \n" + debugString())
// Submit parents Steps
val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
Future {
val result = parentStep.execute(context)
parentStep.synchronized {
result match {
case Success(_) => parentStep.status = COMPLETE
case Failure(_) => parentStep.status = FAILED
}
}
result
}(TransformStep.transformStepContext)
}
val parentsResultSet = ThreadUtils.awaitResult(
Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
Duration.Inf)
val parentsResult = parentsResultSet.foldLeft(Try(true)) { (ret, step) =>
(ret, step) match {
case (Success(_), nextResult) => nextResult
case (Failure(_), _) => ret
}
}
parentSteps.foreach(step => {
while (step.status == RUNNING) {
Thread.sleep(1000L)
}
})
parentsResult match {
case Success(_) =>
info(threadName + " end transform step : \n" + debugString())
doExecute(context)
case Failure(e) =>
error("Parent transform step failed: \n" + debugString(), e)
parentsResult
}
}