def createProcessor()

in DataProcessing/datax-host/src/main/scala/datax/processor/CommonProcessorFactory.scala [50:657]


  def createProcessor(config: UnifiedConfig):CommonProcessor = {
    val sparkConf = config.sparkConf
    val spark = appHost.getSpark(sparkConf)

    val dict = config.dict
    import spark.implicits._

    // Load and initialize functions in parallel to be used in streaming iterations.
    val loadings = Tuple5(
      Future{SchemaFile.loadInputSchema(dict)},
      ProjectionHandler.loadProjectionsFuture(dict),
      TransformHandler.loadTransformFuture(dict),
      ReferenceDataHandler.loadReferenceDataFuture(spark, dict),
      Future {ExtendedUDFHandler.initialize(spark, dict)}
    )

    val (rawBlockSchema, projections, sqlParsed, referencedDataLoaded, udfs) =
      Await.result(for {
        r1 <- loadings._1
        r2 <- loadings._2
        r3 <- loadings._3
        r4 <- loadings._4
        r5 <- loadings._5
      } yield (r1, r2, r3, r4, r5), 10 minutes)

    BuiltInFunctionsHandler.initialize(spark, dict)
    ProjectionHandler.validateProjections(projections)
    JarUDFHandler.loadJarUdf(spark, dict)
    AzureFunctionHandler.initialize(spark, dict)
    UdfInitializer.initialize(spark, dict)
    val createdTables = StateTableHandler.createTables(spark, dict)
    val inputNormalizer = InputNormalizerHandler.initialize(spark, dict)
    val inputNormalizerUdf = if(inputNormalizer==null) udf((s:String)=>s) else udf(inputNormalizer)
    val preProjection = PreProjectionHandler.initialize(spark, dict)
    val buildPropertiesUdf = PropertiesHandler.initialize(spark, dict)

    /*
      function parse input string into a Raw object column based on the input raw blob schema and also project the data frame
       based on the columns from projection files
     */
    def project(inputDf: DataFrame, batchTime: Timestamp): DataFrame = {
      // Initial schema and data set
      var df = inputDf
        .withColumn(ColumnName.RawObjectColumn, from_json(inputNormalizerUdf(col(ColumnName.RawObjectColumn)), rawBlockSchema))

      df = if(preProjection==null)df else preProjection(df)
      val preservedColumns = df.schema.fieldNames.filter(_.startsWith(ColumnName.InternalColumnPrefix))
      df = df.withColumn(ColumnName.PropertiesColumn, buildPropertiesUdf(col(ColumnName.InternalColumnFileInfo), lit(batchTime)))

      for(step <- 0 until projections.length)
        df = df.selectExpr(projections(step)++preservedColumns: _*)

      df
    }

    // initialize metrics settings
    val metricAppName = dict.getMetricAppName()
    val metricConf = MetricsHandler.getMetricsSinkConf(dict)

    // figure out how to output
    val outputs = OutputManager.getOperatators(dict)
    val outputCount = outputs.size

      // initialize output handlers
      outputs.par.foreach(o => {
        if (o.onInitialization != null)
          o.onInitialization(spark)
      })

    // initialize settings of time windows
    val timewindows = TimeWindowHandler.initialize(spark, dict)

    // store past RDDs for overlapping time windows
    val pastRdds = new HashMap[Timestamp, RDD[Row]]()

    // store names of data frames for outputting
    val dataframes = new HashMap[String, DataFrame]

    /*
      function to execute the queries for transforming data frames and output them accordingly
     */
    def route(projectedDf: DataFrame, batchTime: Timestamp, batchInterval: Duration, outputPartitionTime: Timestamp, targets: Set[String], tableNamespace:String) = {
      val transformLogger = LogManager.getLogger("Transformer")

      // store the mapping information between table names in the query and the actual data frame name we are processing
      val tableNames = new HashMap[String, String]

      // register the start input table for the query
      val initTableName = DatasetName.DataStreamProjection+tableNamespace
      tableNames(DatasetName.DataStreamProjection) = initTableName
      dataframes += initTableName -> projectedDf

            // store the data frames that we should unpersist after one iteration
      val dataFramesToUncache = new ListBuffer[DataFrame]
      transformLogger.warn("Persisting the current projected dataframe")
      projectedDf.persist()
      dataFramesToUncache += projectedDf

      // store the metrics to send after one iteration
      val outputMetrics = HashMap[String, Double]()

      // log metric of how many events are incoming on this iteration
      val inputRawEventsCount = projectedDf.count()
      transformLogger.warn(s"Received $inputRawEventsCount raw input events")
      outputMetrics += s"Input_Normalized_Events_Count" -> inputRawEventsCount

      if(timewindows.isEnabled){
        // when time window is turned on, we need to calculate the start and end time of the window and query against
        // the past RDD from history
        // we calculate the time window as the maximum time span from the time windows passed-in from settings
        // this one below determines the end time as event filter, note to minus the watermark span which is the buffer for events to finalize
        val windowEndTime = Timestamp.from(batchTime.toInstant.minusSeconds(timewindows.watermark.toSeconds))
        // this one below determines the start time filter, which is maximum start time of all time windows
        val windowStartTime = Timestamp.from(windowEndTime.toInstant.minusSeconds(timewindows.maxWindow.toSeconds))
        val rdd = projectedDf.where(timewindows.timestampColumn>=windowEndTime).rdd

        transformLogger.warn("Persisting the windowed projected data frame")
        rdd.persist(StorageLevel.MEMORY_ONLY_SER)

        // log metric of after filtered by the time window, how many events actually are participating in the transformation
        val inputEventsCount = rdd.mapPartitions(it=>{
          val loggerSuffix = SparkEnvVariables.getLoggerSuffix()
          val instrumentLogger = LogManager.getLogger(s"${ProductConstant.ProductInstrumentLogger}$loggerSuffix")
          val t1 = System.nanoTime()
          instrumentLogger.warn(s"Start collecting events at $t1")
          val count = it.toArray.length
          val timeNow = System.nanoTime()
          instrumentLogger.warn(s"Collected $count events for caching, spent time=${(timeNow-t1)/1E9} seconds")
          Iterator.single(count)
        }).reduce(_+_)

        transformLogger.warn(s"Received $inputEventsCount input events for ${initTableName}")
        outputMetrics += s"Input_${DatasetName.DataStreamProjection}_Events_Count" -> inputEventsCount

        // collect data from past RDDs that fits in the time window
        val cutTime = Timestamp.from(batchTime.toInstant.minusSeconds((timewindows.watermark+timewindows.maxWindow).toSeconds))
        pastRdds.keys.filter(_.compareTo(cutTime)<=0).foreach(k=>{
          pastRdds.remove(k) match {
            case Some(rdd) =>
              transformLogger.warn(s"removing past RDD at ${k} since it is before or equal to ${cutTime}")
              rdd.unpersist(false)
            case None =>
              transformLogger.warn(s"Unexpectedly ${k} does exist in the pastRDDs")
          }
        })

        // union the data from current projected data frame and the past ones
        val sc = rdd.sparkContext
        val pastDataUnion = spark.createDataFrame(if(pastRdds.size>1){
          transformLogger.warn(s"union ${pastRdds.size} batches, including ${pastRdds.keySet.mkString(",")}")
          sc.union(rdd, pastRdds.values.toSeq: _*)
        } else rdd, projectedDf.schema)

        val unionTableNameInSql = DatasetName.DataStreamProjectionWithWindow
        val unionTableName = unionTableNameInSql+tableNamespace
        pastDataUnion
          .where(timewindows.timestampColumn>=windowStartTime && timewindows.timestampColumn<windowEndTime)
          .createOrReplaceTempView(unionTableName)
        tableNames(unionTableNameInSql) = unionTableName
        dataframes(unionTableName)=spark.table(unionTableName)

        // register time-windowed tables and their corresponding data frames for different time window spec
        for (tw <- timewindows.windows) {
          val winTableName = tw._1
          val winTableNameInScope = winTableName + tableNamespace
          val winStartTime = Timestamp.from(windowEndTime.toInstant.minusSeconds(tw._2.toSeconds))
          transformLogger.warn(s"Create or replace time windowed view '${winTableNameInScope}' within window('$winStartTime' - '$windowEndTime')")
          pastDataUnion
            .where(timewindows.timestampColumn>=winStartTime && timewindows.timestampColumn<windowEndTime)
            .createOrReplaceTempView(winTableNameInScope)
          tableNames(winTableName) = winTableNameInScope
          dataframes(winTableNameInScope)=spark.table(winTableNameInScope)
        }

        // replace the starting table
        val adjustedBatchStartTime = Timestamp.from(windowEndTime.toInstant.minusSeconds(batchInterval.toSeconds))
        val cachedProjectedDf = pastDataUnion.where(timewindows.timestampColumn>=adjustedBatchStartTime && timewindows.timestampColumn<windowEndTime)
        cachedProjectedDf.createOrReplaceTempView(initTableName)

        // register a table to reference to the projected data frame within only the current iteration batch
        val batchedTableName = DatasetName.DataStreamProjectionBatch + tableNamespace
        projectedDf.createOrReplaceTempView(batchedTableName)
        tableNames(DatasetName.DataStreamProjectionBatch) = batchedTableName
        dataframes(batchedTableName)=projectedDf

        pastRdds(batchTime) = rdd
      }
      else{
        // if time window is not turned on, we simply register the projected data frame as input starting table for query
        outputMetrics += s"Input_${DatasetName.DataStreamProjection}_Events_Count" -> inputRawEventsCount
        projectedDf.createOrReplaceTempView(initTableName)
      }

      // register state-store tables
      for (elem <- createdTables) {
        tableNames(elem._1)=elem._2.getActiveTableName()
      }

      // start executing queries
      if(sqlParsed!=null && sqlParsed.commands.length>0){
        val partitionNumber = projectedDf.rdd.getNumPartitions
        val queries = sqlParsed.commands
        queries.foreach(expr=>{
          val statement = TransformSQLParser.replaceTableNames(expr.text, tableNames)
          expr.commandType match {
            case TransformSQLParser.CommandType_Command =>
              transformLogger.warn(s"Executing command '$statement'")
              spark.sql(statement)
            case TransformSQLParser.CommandType_Query =>
              createdTables.find(_._1 == expr.name) match {
                case Some(t) =>
                  // this case is a query statement assigns data set back to a registered state-store table
                  // so we have to overwrite the existing state-store table with the new data
                  t._2.overwrite(statement)
                  tableNames(t._1) = t._2.flip()
                case None =>
                  // this is a normal case that we borther to handle state-store tables
                  val tableName = expr.name + tableNamespace
                  transformLogger.warn(s"Creating view '$tableName' for '$statement'")

                  val ds = if(partitionNumber > 0) {
                    spark.sql(statement).coalesce(partitionNumber)
                  }
                  else {
                    transformLogger.warn(s"Zero events found for $tableName' for '$statement'")
                    spark.sql(statement)
                  }

                  tableNames(expr.name) = tableName
                  dataframes(tableName) = ds

                  // cache data frames which has been referenced more than once to improve performance
                  if(TransformHandler.shouldCacheCommonViews(dict) && sqlParsed.viewReferenceCount.getOrElse(expr.name, 0)>1){
                    transformLogger.warn(s"Caching view '$tableName' for it would be used more than once")
                    ds.cache()
                    dataFramesToUncache += ds
                  }

                  ds.createOrReplaceTempView(tableName)
              }
            case _ =>
              throw new EngineException(s"unknown commandType : ${expr.commandType}")
          }
        })
      }

      // start outputting data
      def outputHandler(operator: OutputOperator) = {
        val tableName = operator.name
        val outputTableName = tableName+tableNamespace
        dataframes.get(outputTableName) match {
          case None => throw new EngineException(s"could not find data set name '$outputTableName' for output '${operator.name}'")
          case Some(df) =>
            if (operator.onBatch != null) operator.onBatch(df.sparkSession, outputPartitionTime, targets)
            operator.output(df, outputPartitionTime).map { case (k, v) => (s"Output_${operator.name}_" + k) -> v.toDouble }
        }
      }

      var result = Map.empty[String,Double]
      if(outputCount>0) {
        // if there are multiple outputs, we kick off them in parallel
        result = if (outputCount > 1)
          outputs.par.map(outputHandler).reduce(_ ++ _)
        else
          outputHandler(outputs(0))
      }

      // persisting state-store tables
      for (elem <- createdTables) {
        elem._2.persist()
      }

      // clear cache of the data frames in this batch of iteration
      transformLogger.warn("Un-persisting the dataframes")
      dataFramesToUncache.foreach(_.unpersist(false))
      dataFramesToUncache.clear()

      outputMetrics ++ result
    }

    /*
      function to process unified data frame - which has 4 columns: raw string input, Properties, SystemProperties and an internal metadata column for processing
     */
    def processDataset(data: DataFrame,
                       batchTime: Timestamp,
                       batchInterval: Duration,
                       outputPartitionTime: Timestamp,
                       targets: Set[String],
                       namespace: String):Map[String, Double] = {
      val t1 = System.nanoTime()
      val batchLogger = LogManager.getLogger(ProductConstant.DataStreamProcessDataSetLogger)
      val metricLogger = MetricLoggerFactory.getMetricLogger(metricAppName, metricConf)
      val spark = data.sparkSession

      def postMetrics(metrics: Iterable[(String, Double)]): Unit = {
        batchLogger.warn(s"Sending metrics:\n${metrics.map(m => m._1 + " -> " + m._2).mkString("\n")}")
        metricLogger.sendBatchMetrics(metrics, batchTime.getTime)
      }

      try{
        // call ExtendedUDFs to refresh their data
        udfs.foreach(udf=>{
          if(udf._2!=null)udf._2(spark, batchTime)
        })

        // if raw input is specified in the output settings as one of the output, we cache it and register that to allow it to be output
        val persistRaw = outputs.find(p=>p.name==DatasetName.DataStreamRaw).isDefined
        if(persistRaw){
          data.cache()
          dataframes(DatasetName.DataStreamRaw) = data
        }

        // main processing steps
        val baseProjection = project(data, batchTime)
        val counts = route(baseProjection, batchTime, batchInterval, outputPartitionTime, targets, namespace)

        // clear the cache of raw input table if needed.
        if(persistRaw){
          data.unpersist(false)
        }

        // calculate performance metrics
        val partitionProcessedTime = System.nanoTime
        val latencyInSeconds = (DateTimeUtil.getCurrentTime().getTime - batchTime.getTime)/1000D
        val metrics = Map[String, Double](
          "Latency-Process" -> (partitionProcessedTime - t1) / 1E9,
          "Latency-Batch" -> latencyInSeconds
        ) ++ counts

        postMetrics(metrics)
        metrics
      }
      catch{
        case e: Exception =>
          appHost.getTelemetryService().trackEvent(ProductConstant.ProductRoot + "/error", Map(
            "errorLocation" -> "ProcessDataFrame",
            "errorMessage" -> e.getMessage,
            "errorStackTrace" -> e.getStackTrace.take(10).mkString("\n"),
            "batchTime" -> batchTime.toString
          ), null)
          appHost.getTelemetryService().trackException(e, Map(
            "errorLocation" -> "ProcessDataFrame",
            "errorMessage" -> e.getMessage,
            "batchTime" -> batchTime.toString
          ), null)

          Thread.sleep(1000)
          throw e
      }
    }

    CommonProcessor(
      /*
        process a batch of EventData from EventHub
       */
      processEventData = (rdd: RDD[EventData], batchTime: Timestamp, batchInterval: Duration, outputPartitionTime: Timestamp) =>{
        processDataset(rdd
          .map(d=>{
            val bodyBytes = d.getBytes
            if(bodyBytes==null) throw new EngineException(s"null bytes from event: ${d.getObject}, properties:${d.getProperties}, systemProperties:${d.getSystemProperties}")
            (
              new String(bodyBytes),
              d.getProperties.asScala.map{case(k,v)=>k->v.toString},
              if(d.getSystemProperties!=null) d.getSystemProperties.asScala.map{case(k,v)=>k->v.toString} else Map.empty[String, String],
              FileInternal())
          })
          .toDF(
            ColumnName.RawObjectColumn,
            ColumnName.RawPropertiesColumn,
            ColumnName.RawSystemPropertiesColumn,
            ColumnName.InternalColumnFileInfo
          ), batchTime, batchInterval, outputPartitionTime, null, "")
      },

      /*
        process structured streaming for given data frame
        Note this is incomplete and not used for now
       */
      processEventHubDataFrame = (df: DataFrame) => {
        val logger = LogManager.getLogger("processEventHubDataFrame")
        df
          .select(
            from_json(col("body").cast("string"), rawBlockSchema).alias("Raw"),
            col("properties"),
            col("enqueuedTime")
          )
          .selectExpr("Raw.*", "properties", "enqueuedTime")
          .withWatermark("enqueuedTime", "60 seconds")
          .createOrReplaceTempView(DatasetName.DataStreamProjection)
        val outputs = sqlParsed.commands
          .filter(n=>n.commandType==TransformSQLParser.CommandType_Query).map(n=>n.name->n.text)
          .toMap

        val streamingConf = StreamingInputSetting.getStreamingInputConf(dict)
        val interval = streamingConf.intervalInSeconds

        outputs.map{case(k, v)=>{
          k-> spark.sql(v).writeStream
            .outputMode(OutputMode.Append())
            .format("console")
            .trigger(Trigger.ProcessingTime(interval, SECONDS))
            .start()
        }}
      },

      /*
        process json data frame
       */
      processJson = (jsonRdd: RDD[String], batchTime: Timestamp, batchInterval: Duration, outputPartitionTime: Timestamp) =>{
        processDataset(jsonRdd.map((FileInternal(), _)).toDF(ColumnName.InternalColumnFileInfo, ColumnName.RawObjectColumn),
          batchTime, batchInterval, outputPartitionTime, null, "")
      },
      // process blob path from batch blob input
      processBatchBlobPaths = (pathsRDD: RDD[String],
                      batchTime: Timestamp,
                      batchInterval: Duration,
                      outputPartitionTime: Timestamp,
                      namespace: String) => {


       val spark = SparkSessionSingleton.getInstance(pathsRDD.sparkContext.getConf)

       val metricLogger = MetricLoggerFactory.getMetricLogger(metricAppName, metricConf)
       val batchTimeStr = DateTimeUtil.formatSimple(batchTime)
       val batchLog = LogManager.getLogger(s"BatchProcessor-B$batchTimeStr")
       val batchTimeInMs = batchTime.getTime

       def postMetrics(metrics: Iterable[(String, Double)]): Unit = {
          metricLogger.sendBatchMetrics(metrics, batchTime.getTime)
          batchLog.warn(s"Metric ${metrics.map(m => m._1 + "=" + m._2).mkString(",")}")
        }

       batchLog.warn(s"Start batch ${batchTime}, output partition time:${outputPartitionTime}, namespace:${namespace}")
       val t1 = System.nanoTime

        // Wrap files to FileInternal object
       val internalFiles =  pathsRDD.map(file => {
                            FileInternal(inputPath = file,
                              outputFolders = null,
                              outputFileName = null,
                              fileTime = null,
                              ruleIndexPrefix = "",
                              target = null
                            )
                          })

        val paths = internalFiles.map(_.inputPath).collect()
        postMetrics(Map(s"InputBlobs" -> paths.size.toDouble))
        batchLog.warn(s"InputBlob count=${paths.size}");

        val blobStorageKey = ExecutorHelper.createBlobStorageKeyBroadcastVariable(paths.head, spark)

        val inputDf = internalFiles
                      .flatMap(file => HadoopClient.readHdfsFile(file.inputPath, gzip = file.inputPath.endsWith(".gz"), blobStorageKey)
                      .filter(l=>l!=null && !l.isEmpty).map((file, outputPartitionTime, _)))
                      .toDF(ColumnName.InternalColumnFileInfo, ColumnName.MetadataColumnOutputPartitionTime, ColumnName.RawObjectColumn)

        val processedMetrics = processDataset(inputDf, batchTime, batchInterval, outputPartitionTime, null, "")

        val batchProcessingTime = (System.nanoTime - t1) / 1E9

        val metrics = Map[String, Double](
          "BatchProcessedET" -> batchProcessingTime
        )
        processedMetrics ++ metrics
      },

      // process blob path pointer data frame
      processPaths = (pathsRDD: RDD[String],
                      batchTime: Timestamp,
                      batchInterval: Duration,
                      outputPartitionTime: Timestamp,
                      namespace: String) => {
        val spark = SparkSessionSingleton.getInstance(pathsRDD.sparkContext.getConf)

        val metricLogger = MetricLoggerFactory.getMetricLogger(metricAppName, metricConf)
        val batchTimeStr = DateTimeUtil.formatSimple(batchTime)
        val batchLog = LogManager.getLogger(s"BatchProcessor-B$batchTimeStr")

        // Functions used with in processPaths
        val batchTimeInMs = batchTime.getTime

        def postMetrics(metrics: Iterable[(String, Double)]): Unit = {
          metricLogger.sendBatchMetrics(metrics, batchTimeInMs)
          batchLog.warn(s"Metric ${metrics.map(m => m._1 + "=" + m._2).mkString(",")}")
        }

        // Process the array of input files, and sink them
        // Return metrics: (number of processed blobs, number of processed events, number of filtered events sent to eventhub)
        def processBlobs(files: Array[FileInternal],
                         outputPartitionTime: Timestamp,
                         partition: String,
                         targetPar: String): Map[String, Double] = {
          val filesCount = files.length
          val t1 = System.nanoTime()

          // Get the earliest blob to calculate latency
          val paths = files.map(_.inputPath)
          val blobTimes = files.map(_.fileTime).filterNot(_ == null).toList

          postMetrics(Map(s"InputBlobs" -> filesCount.toDouble))

          val (minBlobTime, maxBlobTime) =
            if(blobTimes.length>0) {
              val minBlobTime = blobTimes.minBy(_.getTime)
              val maxBlobTime = blobTimes.maxBy(_.getTime)
              batchLog.warn(s"partition '$partition': started, size: $filesCount, blob time range[${DateTimeUtil.formatSimple(minBlobTime)}, ${DateTimeUtil.formatSimple(maxBlobTime)}]")
              (minBlobTime, maxBlobTime)
            }
            else{
              batchLog.warn(s"Cannot figure out timestamp from file name, please check if there is misconfiguration in the fileTimeRegex setting")
              (null, null)
            }

          val pathsList = paths.mkString(",")
          batchLog.debug(s"Batch loading files:$pathsList")
          val inputDf = spark.sparkContext.parallelize(files, filesCount)
            .flatMap(file => HadoopClient.readHdfsFile(file.inputPath, gzip = file.inputPath.endsWith(".gz"))
              .filter(l=>l!=null && !l.isEmpty).map((file, outputPartitionTime, _)))
            .toDF(ColumnName.InternalColumnFileInfo, ColumnName.MetadataColumnOutputPartitionTime, ColumnName.RawObjectColumn)

          val targets = files.map(_.target).toSet
          val processedMetrics = processDataset(inputDf, batchTime, batchInterval, outputPartitionTime, targets, partition)
          if(minBlobTime!=null){
            val latencyInSeconds = (DateTimeUtil.getCurrentTime().getTime - minBlobTime.getTime)/1000D
            val latencyMetrics = Map(s"Latency-Blobs" -> latencyInSeconds)
            postMetrics(latencyMetrics)
            latencyMetrics++processedMetrics
          }
          else{
            processedMetrics
          }
        }

        def processPartition(v: (String, HashSet[FileInternal])) = {
          val par = v._1
          val paths = v._2.toArray
          processBlobs(paths, outputPartitionTime, par+namespace, par)
        }

        batchLog.warn(s"Start batch ${batchTime}, output partition time:${outputPartitionTime}, namespace:${namespace}")
        val t1 = System.nanoTime
        val pathsGroups = BlobPointerInput.pathsToGroups(rdd = pathsRDD,
          jobName = dict.getAppName(),
          dict = dict,
          outputTimestamp = outputPartitionTime)
        val pathsFilteredGroups = BlobPointerInput.filterPathGroups(pathsGroups)
        val pathsCount = pathsFilteredGroups.aggregate(0)(_ + _._2.size, _ + _)
        //try {
        val result =
          if (pathsCount > 0) {
            batchLog.warn(s"Loading filtered blob files count=$pathsCount, First File=${pathsFilteredGroups.head._2.head}")
            if (pathsFilteredGroups.length > 1)
              Await.result(FutureUtil.failFast(pathsFilteredGroups
                .map(kv => Future {
                  processPartition(kv)
                })), 5 minutes).reduce(DataMerger.mergeMapOfDoubles)
            else
              processPartition(pathsFilteredGroups(0))
          }
          else {
            batchLog.warn(s"No valid paths is found to process for this batch")
            Map[String, Double]()
          }
        val batchProcessingTime = (System.nanoTime - t1) / 1E9

        val metrics = Map[String, Double](
          "BatchProcessedET" -> batchProcessingTime
        )

        postMetrics(metrics)
        batchLog.warn(s"End batch ${batchTime}, output partition time:${outputPartitionTime}, namespace:${namespace}")

        metrics ++ result
      }, // end of processPaths
      /*
      process a batch of ConsumerRecords from kafka
     */
      processConsumerRecord = (rdd: RDD[ConsumerRecord[String,String]], batchTime: Timestamp, batchInterval: Duration, outputPartitionTime: Timestamp) =>{
        processDataset(rdd
          .map(d=>{
            val value = d.value()
            if(value==null) throw new EngineException(s"null bytes from ConsumerRecord")
            //Capture key if present. Key can be null.
            val key = if(d.key!=null) Some("key"->d.key.toString) else None
            (
              value,
              Map.empty[String, String],// Properties
              Map[String, String](
                "offset"->d.offset().toString,
                "partition"->d.partition().toString,
                "serializedKeySize"->d.serializedKeySize().toString,
                "serializedValueSize"->d.serializedValueSize().toString,
                "timestamp"->d.timestamp().toString,
                "timestampType"->d.timestampType().toString,
                "topic"->d.topic()
              ) ++ key,
              FileInternal())
          })
          .toDF(
            ColumnName.RawObjectColumn,
            ColumnName.RawPropertiesColumn,
            ColumnName.RawSystemPropertiesColumn,
            ColumnName.InternalColumnFileInfo
          ), batchTime, batchInterval, outputPartitionTime, null, "")
      } // end of proessConsumerRecord
    ) // end of CommonProcessor
  } // end of init