in measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala [37:53]
def doExecute(context: DQContext): Try[Boolean] =
Try {
val sparkSession = context.sparkSession
val df = rule match {
case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession, inputDfName, details)
case DataFrameOps._accuracy =>
DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId, details)
case DataFrameOps._clear => DataFrameOps.clear(sparkSession, inputDfName, details)
case _ => throw new Exception(s"df opr [ $rule ] not supported")
}
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
writeStepOpt match {
case Some(writeStep) => writeStep.execute(context)
case None => Try(true)
}
}.flatten