override def processData()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala [85:507]


  override def processData(sparkSession: SparkSession): Seq[Row] = {
    val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
    val st = System.currentTimeMillis()
    // if the input data is empty, return to avoid unnecessary operations. It can happen in
    // streaming cases where new data is not pushed to streams.
    if (srcDS.rdd.isEmpty()) {
      return Seq()
    }
    val targetDsAliasName = targetDsOri.logicalPlan match {
      case alias: SubqueryAlias =>
        alias.alias
      case _ => null
    }
    val sourceAliasName = srcDS.logicalPlan match {
      case alias: SubqueryAlias =>
        alias.alias
      case _ => null
    }
    if (relations.length != 1) {
      throw new UnsupportedOperationException(
        "Carbon table supposed to be present in merge dataset")
    }

    val properties = CarbonProperties.getInstance()
    if (operationType != null) {
      val filterDupes = properties
        .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
          CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
      val isSchemaEnforcementEnabled = properties
        .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
          CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
      if (
        !MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT) &&
        filterDupes) {
        throw new MalformedCarbonCommandException("property CARBON_STREAMER_INSERT_DEDUPLICATE" +
                                                  " should only be set with operation type INSERT")
      }
      if (isSchemaEnforcementEnabled) {
        // call the util function to verify if incoming schema matches with target schema
        CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
      } else {
        CarbonMergeDataSetUtil.handleSchemaEvolution(
          targetDsOri, srcDS, sparkSession)
      }
    }

    // Target dataset must be backed by carbondata table.
    val tgtTable = relations.head.carbonRelation.carbonTable
    val targetCarbonTable: CarbonTable = CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),
      tgtTable.getTableName)(sparkSession)

    // select only the required columns, it can avoid lot of and shuffling.
    val targetDs = if (mergeMatches == null && operationType != null) {
      targetDsOri.select(keyColumn)
    } else {
      // Get all the required columns of targetDS by going through all match conditions and actions.
      val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
      targetDsOri.select(columns: _*)
    }
    // decide join type based on match conditions or based on merge operation type
    val joinType = if (mergeMatches == null && operationType != null) {
      MergeOperationType.withName(operationType.toUpperCase) match {
        case MergeOperationType.UPDATE | MergeOperationType.DELETE =>
          "inner"
        case MergeOperationType.UPSERT =>
          "right_outer"
        case MergeOperationType.INSERT =>
          null
      }
    } else {
      decideJoinType
    }

    val joinColumns = if (mergeMatches == null) {
      Seq(keyColumn)
    } else {
      mergeMatches.joinExpr.expr.collect {
        case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
          // Let's say the join condition will be something like A.id = B.id, then it will be an
          // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
          // be a Literal(B.id). Since we need the column name here, we can directly check the left
          // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
          // ArrayBuffer containing "A" and "id", since "id" is column name, we take
          // nameparts.tail.head which gives us "id" column name.
          unresolvedAttribute.nameParts.tail.head
      }.distinct
    }

    // repartition the srsDs, if the target has bucketing and the bucketing columns contains join
    // columns
    val repartitionedSrcDs =
      if (targetCarbonTable.getBucketingInfo != null &&
          targetCarbonTable.getBucketingInfo
            .getListOfColumns
            .asScala
            .map(_.getColumnName).containsSlice(joinColumns)) {
        srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges,
          joinColumns.map(srcDS.col): _*)
      } else {
      srcDS
    }

    // deduplicate the incoming dataset
    // TODO: handle the case for partial updates
    val orderingField = properties.getProperty(
      CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD,
      CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)
    val deduplicatedSrcDs = if (keyColumn != null) {
      CarbonMergeDataSetUtil.deduplicateBeforeWriting(repartitionedSrcDs,
        targetDs,
        sparkSession,
        sourceAliasName,
        targetDsAliasName,
        keyColumn,
        orderingField,
        targetCarbonTable)
    } else {
      repartitionedSrcDs
    }

    // cache the source data as we will be scanning multiple times
    deduplicatedSrcDs.cache()
    val deDuplicatedRecords = deduplicatedSrcDs.count()
    LOGGER.info(s"Number of records from source data: $deDuplicatedRecords")
    // Create accumulators to log the stats
    val stats = Stats(createLongAccumulator("insertedRows"),
      createLongAccumulator("updatedRows"),
      createLongAccumulator("deletedRows"))

    var finalCarbonFilesToScan: Array[String] = Array.empty[String]
    // the pruning will happen when the join type is not full_outer, in case of full_outer,
    // we will be needing all the records from left table which is target table, so no need to prune
    // target table based on min max of source table.
    val isMinMaxPruningEnabled = CarbonProperties.getInstance()
      .getProperty(CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED,
        CarbonCommonConstants.CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT).toBoolean
    var didNotPrune = false
    breakable {
      if (isMinMaxPruningEnabled && joinType != null && !joinType.equalsIgnoreCase("full_outer")) {
        // 1. get all the join columns of equal to conditions or equi joins
        val targetKeyColumns = CarbonMergeDataSetUtil.getTargetTableKeyColumns(keyColumn,
          targetDsAliasName,
          targetCarbonTable,
          mergeMatches)
        val joinCarbonColumns = targetKeyColumns.collect {
          case column => targetCarbonTable.getColumnByName(column)
        }

        LOGGER
          .info(s"Key columns for join are: ${ joinCarbonColumns.map(_.getColName).mkString(",") }")

        val columnToIndexMap: util.Map[String, Integer] = new util.LinkedHashMap[String, Integer]
        // get the min max cache column and based on that determine the index to check in min-max
        // array or Index Row
        val minMaxColumns = targetCarbonTable.getMinMaxCachedColumnsInCreateOrder
        if (minMaxColumns.size() != 0) {
          if (minMaxColumns.size() ==
              targetCarbonTable.getTableInfo.getFactTable.getListOfColumns.size() ||
              minMaxColumns.size() == 1 && minMaxColumns.get(0).equalsIgnoreCase("All columns")) {
            joinCarbonColumns.foreach { column =>
              if (column.isDimension) {
                columnToIndexMap.put(column.getColName, column.getOrdinal)
              } else {
                columnToIndexMap.put(column.getColName,
                  targetCarbonTable.getVisibleDimensions.size() + column.getOrdinal)
              }
            }
          } else {
            // handing case where only some columns are present as cached columns and check if those
            // columns has the target key columns or join columns
            val joinColumnsPresentInMinMaxCacheCols = joinCarbonColumns.map(_.getColName)
              .intersect(minMaxColumns.asScala.toSet)
            if (joinColumnsPresentInMinMaxCacheCols.isEmpty ||
                joinColumnsPresentInMinMaxCacheCols.size == joinCarbonColumns.size) {
              // 1. if none of the join columns are present in cache columns, then all blocklets
              // will be selected, so pruning is not required
              // 2. when one of the columns is not present in cache columns, no need to prune, as it
              // may lead to wrong data due to different filter conditions like OR
              didNotPrune = true
              break()
            }
          }
        }

        // get the splits required, which will also load the cache based on the configuration either
        // in index server or driver
        val columnMinMaxInBlocklet: util.LinkedHashMap[String, util.List[FilePathMinMaxVO]] =
          new util.LinkedHashMap[String, util.List[FilePathMinMaxVO]]
        val colToSplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]] =
          CarbonMergeDataSetUtil.getSplitsAndLoadToCache(targetCarbonTable,
            deduplicatedSrcDs,
            columnMinMaxInBlocklet,
            columnToIndexMap,
            sparkSession)

        LOGGER.info("Finished getting splits from driver or index server")

        // 2. get the tuple of filepath, min, max of the columns required, the min max should be
        // converted to actual value based on the datatype logic collection to store only block and
        // block level min and max
        val fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String,
          (AnyRef, AnyRef)], CarbonColumn)] =
        mutable.ArrayBuffer.empty[(mutable.Map[String, (AnyRef, AnyRef)], CarbonColumn)]

        val joinColumnsToComparatorMap:
          mutable.LinkedHashMap[CarbonColumn, SerializableComparator] =
          mutable.LinkedHashMap.empty[CarbonColumn, SerializableComparator]
        joinCarbonColumns.map { joinColumn =>
          val joinDataType = joinColumn.getDataType
          val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
                                      (joinDataType != DataTypes.DATE)
          val comparator = if (isPrimitiveAndNotDate) {
            Comparator.getComparator(joinDataType)
          } else if (joinDataType == DataTypes.STRING) {
            null
          } else {
            Comparator.getComparatorByDataTypeForMeasure(joinDataType)
          }
          joinColumnsToComparatorMap += (joinColumn -> comparator)
        }

        // 3. prepare (filepath, (min, max)) at a block level.
        CarbonMergeDataSetUtil.addFilePathAndMinMaxTuples(colToSplitsFilePathAndMinMaxMap,
          targetCarbonTable,
          joinColumnsToComparatorMap,
          fileMinMaxMapListOfAllJoinColumns)

        // 4. prepare mapping of column and a range tree based on filepath, min and max for that
        // column. Here assumption is join expression columns will be less in actual use case.
        // Basically a primary column
        val joinColumnToTreeMapping: mutable.LinkedHashMap[CarbonColumn, BlockMinMaxTree] =
        mutable.LinkedHashMap.empty[CarbonColumn, BlockMinMaxTree]
        fileMinMaxMapListOfAllJoinColumns.foreach { case (fileMinMaxMap, joinCarbonColumn) =>
          val joinDataType = joinCarbonColumn.getDataType
          val isDimension = joinCarbonColumn.isDimension
          val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
                                      (joinDataType != DataTypes.DATE)
          val comparator = joinColumnsToComparatorMap(joinCarbonColumn)
          val rangeIntervalTree = new BlockMinMaxTree(isPrimitiveAndNotDate,
            isDimension, joinDataType, comparator)
          fileMinMaxMap.foreach { case (filePath, minMax) =>
            rangeIntervalTree.insert(new MinMaxNode(filePath, minMax._1, minMax._2))
          }
          joinColumnToTreeMapping += ((joinCarbonColumn, rangeIntervalTree))
        }

        // 5.from srcRDD, do map and then for each row search in min max tree prepared above and
        // find the file paths to scan.
        finalCarbonFilesToScan = CarbonMergeDataSetUtil.getFilesToScan(joinCarbonColumns,
          joinColumnToTreeMapping,
          deduplicatedSrcDs)

        LOGGER.info(s"Finished min-max pruning. Carbondata files to scan during merge is: ${
          finalCarbonFilesToScan.length}")
      }
    }

    // check if its just upsert/update/delete/insert operation and go to UpsertHandler
    if (mergeMatches == null && operationType != null) {
      val isInsertOperation = operationType.equalsIgnoreCase(MergeOperationType.INSERT.toString)
      val frame = if (isMinMaxPruningEnabled && !didNotPrune) {
        // if min-max pruning is enabled then we need to add blockUDFs filter to scan only the
        // pruned carbondata files from target carbon table.
        if (!isInsertOperation) {
          targetDs
            .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
            .where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
            .join(deduplicatedSrcDs.select(keyColumn),
              expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
              joinType)
        } else {
          null
        }
      } else {
        if (!isInsertOperation) {
          targetDs
            .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
            .join(deduplicatedSrcDs.select(keyColumn),
              expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
              joinType)
        } else {
          null
        }
      }
      val mergeHandler: MergeHandler =
        MergeOperationType.withName(operationType.toUpperCase) match {
        case MergeOperationType.UPSERT =>
          UpsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
        case MergeOperationType.UPDATE =>
          UpdateHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
        case MergeOperationType.DELETE =>
          DeleteHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
        case MergeOperationType.INSERT =>
          InsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
        }

      // execute merge handler
      mergeHandler.handleMerge()
      LOGGER.info(
        " Time taken to merge data  :: " + (System.currentTimeMillis() - st))
      // clear the cached src
      deduplicatedSrcDs.unpersist()
      return Seq()
    }
    // validate the merge matches and actions.
    validateMergeActions(mergeMatches, targetDsOri, sparkSession)
    val hasDelAction = mergeMatches.matchList
      .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
    val hasUpdateAction = mergeMatches.matchList
      .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
    val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches)
    // Update the update mapping with unfilled columns.From here on system assumes all mappings
    // are existed.
    mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
    // Lets generate all conditions combinations as one column and add them as 'status'.
    val condition = generateStatusColumnWithAllCombinations(mergeMatches)

    // Add the getTupleId() udf to get the tuple id to generate delete delta.
    val frame = if (isMinMaxPruningEnabled && !didNotPrune) {
      targetDs
        .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
        .withColumn("exist_on_target", lit(1))
        .where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
        .join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
          mergeMatches.joinExpr,
          joinType)
        .withColumn(status_on_mergeds, condition)
    } else {
      targetDs
        .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
        .withColumn("exist_on_target", lit(1))
        .join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
          mergeMatches.joinExpr,
          joinType)
        .withColumn(status_on_mergeds, condition)
    }
    if (LOGGER.isDebugEnabled) {
      frame.explain()
    }
    val tableCols =
      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
    val header = tableCols.mkString(",")

    val frameWithoutStatusCol = frame.drop(status_on_mergeds)
    val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { m =>
      m.getActions.map {
        case u: UpdateAction => MergeProjection(tableCols,
          frameWithoutStatusCol,
          relations.head,
          sparkSession,
          u)
        case i: InsertAction => MergeProjection(tableCols,
          frameWithoutStatusCol,
          relations.head,
          sparkSession,
          i)
        case d: DeleteAction => MergeProjection(tableCols,
          frameWithoutStatusCol,
          relations.head,
          sparkSession,
          d)
        case _ => null
      }.filter(_ != null)
    }

    val targetSchema = StructType(tableCols.map { f =>
      relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
    } ++ Seq(StructField(status_on_mergeds, IntegerType)))
    val (processedRDD, deltaPath) = processIUD(sparkSession, frame, targetCarbonTable, projections,
      targetSchema, stats)

    val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
    val trxMgr = TranxManager(System.currentTimeMillis())

    val mutationAction = MutationActionFactory.getMutationAction(sparkSession,
      targetCarbonTable, hasDelAction, hasUpdateAction,
      insertHistOfUpdate, insertHistOfDelete)

    val loadDF = Dataset.ofRows(sparkSession,
      LogicalRDD(targetSchema.toAttributes,
        processedRDD)(sparkSession))

    loadDF.cache()
    val count = loadDF.count()
    val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
      val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
      val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
      MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
        trxMgr.getLatestTrx, tuple)
      Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
        executorErrors, tuple._2, Option.empty))
    } else {
      None
    }

    val dataFrame = loadDF.select(tableCols.map(col): _*)
    MergeUtil.insertDataToTargetTable(sparkSession,
      targetCarbonTable,
      header,
      updateTableModel,
      dataFrame)

    if (hasDelAction && count == 0) {
      MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, trxMgr.getLatestTrx)
    }
    LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
    LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
    LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
    LOGGER.info(
      " Time taken to merge data  :: " + (System.currentTimeMillis() - st))

    // Load the history table if the insert history table action is added by user.
    HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, targetCarbonTable,
      trxMgr, mutationAction, mergeMatches)
    // Do IUD Compaction.
    HorizontalCompaction.tryHorizontalCompaction(
      sparkSession, targetCarbonTable)
    // clear the cached src
    deduplicatedSrcDs.unpersist()
    Seq.empty
  }