in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala [56:93]
def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
// new context
val context = createContext(ms)
val timestamp = context.contextId.timestamp
val thisTable = dcParam.getDataFrameName("this")
try {
saveTmst(timestamp) // save timestamp
val processedDf = dfOpt match {
case Some(df) =>
context.compileTableRegister.registerTable(thisTable)
dcParam.getPreProcRules.foldLeft(df)((dataFrame, rule) => {
Try {
context.runTimeTableRegister.registerTable(thisTable, dataFrame)
sparkSession.sql(rule)
} match {
case Success(value) => value
case Failure(exception) =>
val errorMsg =
s"Exception occurred while preprocessing dataset with name '$thisTable'"
error(errorMsg, exception)
throw exception
}
})
case None => null
}
Option(processedDf)
.map(_.withColumn(ConstantColumns.tmst, lit(timestamp)))
} catch {
case e: Throwable =>
error(s"pre-process of data connector [$id] error: ${e.getMessage}", e)
None
}
}