in measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala [147:216]
private def executeMeasures(
input: DataFrame,
dataSourceName: String,
measureParams: Seq[MeasureParam],
batchId: Long = -1L): Unit = {
val numMeasures: Int = measureParams.length
withCacheIfNecessary(
dataSourceName,
numMeasures,
input, {
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
// define the tasks
val tasks = (for (i <- measureParams.indices)
yield {
val measureParam = measureParams(i)
val measure = createMeasure(measureParam)
val measureName = measureParam.getName
(measure, Future {
info(s"Started execution of measure with name '$measureName'")
val (recordsDf, metricsDf) = measure.execute(input)
val currentContext = context.cloneDQContext(ContextId(new Date().getTime))
persistMetrics(currentContext, measure, metricsDf)
persistRecords(currentContext, measure, recordsDf)
MetricFlushStep(Some(measureParam)).execute(currentContext)
}(ec))
}).toMap
tasks.foreach(task => {
val measureName = task._1.measureParam.getName
task._2.onComplete {
case Success(_) =>
info(
s"Successfully executed measure with name '$measureName' on data source with name " +
s"'$dataSourceName")
case Failure(exception) =>
error(
s"Error occurred while executing measure with name '$measureName' on data source with name " +
s"'$dataSourceName'",
exception)
}(ec)
})
var deadline = Duration(10, TimeUnit.SECONDS).fromNow
while (!tasks.forall(_._2.isCompleted)) {
if (deadline.isOverdue()) {
val unfinishedMeasureNames = tasks
.filterNot(_._2.isCompleted)
.map(_._1.measureParam.getName)
.mkString("['", "', '", "']")
info(s"Measures with name $unfinishedMeasureNames are still executing.")
deadline = Duration(10, TimeUnit.SECONDS).fromNow
}
}
info(
"Completed execution of all measures for data source with " +
s"name '${measureParams.head.getDataSource}'.")
})
}