in measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala [169:216]
def run(): Unit = {
val updateTimeDate = new Date()
val updateTime = updateTimeDate.getTime
println(s"===== [$updateTimeDate] process begins =====")
val locked = lock.lock(5, TimeUnit.SECONDS)
if (locked) {
try {
import org.apache.griffin.measure.utils.CommonUtils
OffsetCheckpointClient.startOffsetCheckpoint()
CommonUtils.timeThis({
// start time
val startTime = new Date().getTime
val contextId = ContextId(startTime)
// create dq context
dqContext = globalContext.cloneDQContext(contextId)
// build job
dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
// dq job execute
dqJob.execute(dqContext)
// finish calculation
finishCalculation(dqContext)
// end time
}, TimeUnit.MILLISECONDS)
OffsetCheckpointClient.endOffsetCheckpoint()
// clean old data
cleanData(dqContext)
} catch {
case e: Throwable => error(s"process error: ${e.getMessage}")
} finally {
lock.unlock()
}
} else {
println(s"===== [$updateTimeDate] process ignores =====")
}
val endTime = new Date().getTime
println(s"===== [$updateTimeDate] process ends, using ${endTime - updateTime} ms =====")
}