def preProcess()

in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala [56:93]


  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
    // new context
    val context = createContext(ms)

    val timestamp = context.contextId.timestamp
    val thisTable = dcParam.getDataFrameName("this")
    try {
      saveTmst(timestamp) // save timestamp

      val processedDf = dfOpt match {
        case Some(df) =>
          context.compileTableRegister.registerTable(thisTable)

          dcParam.getPreProcRules.foldLeft(df)((dataFrame, rule) => {
            Try {
              context.runTimeTableRegister.registerTable(thisTable, dataFrame)

              sparkSession.sql(rule)
            } match {
              case Success(value) => value
              case Failure(exception) =>
                val errorMsg =
                  s"Exception occurred while preprocessing dataset with name '$thisTable'"
                error(errorMsg, exception)
                throw exception
            }
          })
        case None => null
      }

      Option(processedDf)
        .map(_.withColumn(ConstantColumns.tmst, lit(timestamp)))
    } catch {
      case e: Throwable =>
        error(s"pre-process of data connector [$id] error: ${e.getMessage}", e)
        None
    }
  }