def run()

in measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala [169:216]


    def run(): Unit = {
      val updateTimeDate = new Date()
      val updateTime = updateTimeDate.getTime
      println(s"===== [$updateTimeDate] process begins =====")
      val locked = lock.lock(5, TimeUnit.SECONDS)
      if (locked) {
        try {
          import org.apache.griffin.measure.utils.CommonUtils

          OffsetCheckpointClient.startOffsetCheckpoint()

          CommonUtils.timeThis({
            // start time
            val startTime = new Date().getTime

            val contextId = ContextId(startTime)

            // create dq context
            dqContext = globalContext.cloneDQContext(contextId)

            // build job
            dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)

            // dq job execute
            dqJob.execute(dqContext)

            // finish calculation
            finishCalculation(dqContext)

            // end time
          }, TimeUnit.MILLISECONDS)

          OffsetCheckpointClient.endOffsetCheckpoint()

          // clean old data
          cleanData(dqContext)

        } catch {
          case e: Throwable => error(s"process error: ${e.getMessage}")
        } finally {
          lock.unlock()
        }
      } else {
        println(s"===== [$updateTimeDate] process ignores =====")
      }
      val endTime = new Date().getTime
      println(s"===== [$updateTimeDate] process ends, using ${endTime - updateTime} ms =====")
    }