private def executeMeasures()

in measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala [147:216]


  private def executeMeasures(
      input: DataFrame,
      dataSourceName: String,
      measureParams: Seq[MeasureParam],
      batchId: Long = -1L): Unit = {
    val numMeasures: Int = measureParams.length

    withCacheIfNecessary(
      dataSourceName,
      numMeasures,
      input, {
        import java.util.concurrent.TimeUnit

        import scala.concurrent.duration.Duration

        // define the tasks
        val tasks = (for (i <- measureParams.indices)
          yield {
            val measureParam = measureParams(i)
            val measure = createMeasure(measureParam)
            val measureName = measureParam.getName

            (measure, Future {
              info(s"Started execution of measure with name '$measureName'")

              val (recordsDf, metricsDf) = measure.execute(input)
              val currentContext = context.cloneDQContext(ContextId(new Date().getTime))

              persistMetrics(currentContext, measure, metricsDf)
              persistRecords(currentContext, measure, recordsDf)

              MetricFlushStep(Some(measureParam)).execute(currentContext)
            }(ec))
          }).toMap

        tasks.foreach(task => {
          val measureName = task._1.measureParam.getName

          task._2.onComplete {
            case Success(_) =>
              info(
                s"Successfully executed measure with name '$measureName' on data source with name " +
                  s"'$dataSourceName")
            case Failure(exception) =>
              error(
                s"Error occurred while executing measure with name '$measureName' on data source with name " +
                  s"'$dataSourceName'",
                exception)
          }(ec)
        })

        var deadline = Duration(10, TimeUnit.SECONDS).fromNow

        while (!tasks.forall(_._2.isCompleted)) {
          if (deadline.isOverdue()) {
            val unfinishedMeasureNames = tasks
              .filterNot(_._2.isCompleted)
              .map(_._1.measureParam.getName)
              .mkString("['", "', '", "']")

            info(s"Measures with name $unfinishedMeasureNames are still executing.")
            deadline = Duration(10, TimeUnit.SECONDS).fromNow
          }
        }

        info(
          "Completed execution of all measures for data source with " +
            s"name '${measureParams.head.getDataSource}'.")
      })
  }