def loadDataWithPartition()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala [869:1162]


  def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
    val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get
    CarbonThreadUtil.threadSet("partition.operationcontext", loadParams.operationContext)
    val attributes = if (loadParams.scanResultRDD.isDefined) {
      // take the already re-arranged attributes
      catalogTable.schema.toAttributes
    } else {
      // input data from csv files. Convert to logical plan
      val allCols = new ArrayBuffer[String]()
      // get only the visible dimensions from table
      allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
      allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
      StructType(
        allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
          StructField(_, StringType))).toAttributes
    }
    var partitionsLen = 0
    val sortScope = CarbonDataProcessorUtil.getSortScope(loadParams.carbonLoadModel.getSortScope)
    val partitionValues = if (loadParams.finalPartition.nonEmpty) {
      loadParams.finalPartition.filter(_._2.nonEmpty).map { case (col, value) =>
        catalogTable.schema.find(_.name.equalsIgnoreCase(col)) match {
          case Some(c) =>
            CarbonScalaUtil.convertToDateAndTimeFormats(
              value.get,
              c.dataType,
              loadParams.timeStampFormat,
              loadParams.dateFormat)
          case None =>
            throw new AnalysisException(s"$col is not a valid partition column in table ${
              loadParams.carbonLoadModel
                .getDatabaseName
            }.${ loadParams.carbonLoadModel.getTableName }")
        }
      }.toArray
    } else {
      Array[String]()
    }
    var persistedRDD: Option[RDD[InternalRow]] = None
    val partitionBasedOnLocality = CarbonProperties.getInstance()
      .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
        CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
    try {
      val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
                                   loadParams.scanResultRDD.isDefined) {
        val (rdd, dfAttributes) = {
            // Get the updated query plan in case of update scenario
            if (loadParams.finalPartition.nonEmpty) {
              val headers = loadParams.carbonLoadModel
                .getCsvHeaderColumns
                .dropRight(loadParams.finalPartition.size)
              val updatedHeader = headers ++ loadParams.finalPartition.keys.map(_.toLowerCase)
              loadParams.carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
              loadParams.carbonLoadModel
                .setCsvHeaderColumns(loadParams.carbonLoadModel.getCsvHeader.split(","))
            }
            if (loadParams.dataFrame.isDefined) {
              (loadParams.dataFrame.get.rdd, loadParams.dataFrame.get.schema)
            } else {
              (null, null)
            }
        }
        if (loadParams.dataFrame.isDefined) {
          val expectedColumns = {
            val staticPartCols = loadParams.finalPartition.filter(_._2.isDefined).keySet
              .map(columnName => columnName.toLowerCase())
            attributes.filterNot(a => staticPartCols.contains(a.name.toLowerCase))
          }
          val spatialProperty = catalogTable.storage
            .properties.get(CarbonCommonConstants.SPATIAL_INDEX)
          // For spatial table, dataframe attributes will not contain geoId column.
          val isSpatialTable = spatialProperty.isDefined && spatialProperty.nonEmpty &&
                                   dfAttributes.length + 1 == expectedColumns.size
          if (expectedColumns.length != dfAttributes.length && !isSpatialTable) {
            throw new AnalysisException(
              s"Cannot insert into table $loadParams.tableName because the number of columns are " +
              s"different: " +
              s"need ${ expectedColumns.length } columns, " +
              s"but query has ${ dfAttributes.length } columns.")
          }
          val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray
          val partitionBounds = new Array[Int](partitionValues.length)
          if (loadParams.finalPartition.nonEmpty) {
            val nonPartitionSchemaLen = attributes.length - loadParams.finalPartition.size
            var i = nonPartitionSchemaLen
            var index = 0
            var partIndex = 0
            loadParams.finalPartition.values.foreach { p =>
              if (p.isDefined) {
                partitionBounds(partIndex) = nonPartitionSchemaLen + index
                partIndex = partIndex + 1
              } else {
                nonPartitionBounds(i) = nonPartitionSchemaLen + index
                i = i + 1
              }
              index = index + 1
            }
          }
          val len = dfAttributes.length + partitionValues.length
          val transRdd = rdd.map { f =>
            val data = new Array[Any](len)
            var i = 0
            val length = f.length
            while (i < length) {
              data(nonPartitionBounds(i)) = f.get(i)
              i = i + 1
            }
            var j = 0
            val boundLength = partitionBounds.length
            while (j < boundLength) {
              data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j))
              j = j + 1
            }
            Row.fromSeq(data)
          }
          val (transformedPlan, partitions, persistedRDDLocal) =
            transformQueryWithRow(
              transRdd,
              loadParams.sparkSession,
              loadParams.carbonLoadModel,
              partitionValues,
              catalogTable,
              attributes,
              sortScope,
              isDataFrame = true, table, loadParams.finalPartition)
          partitionsLen = partitions
          persistedRDD = persistedRDDLocal
          transformedPlan
        } else {
          val rdd = loadParams.scanResultRDD.get
          val newRdd =
            if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
              val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
                DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
              }.distinct.length
              val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
                nodeNumOfData,
                loadParams.sparkSession.sqlContext.sparkContext)
              val coalescedRdd = new DataLoadCoalescedRDD[InternalRow](
                loadParams.sparkSession,
                rdd,
                nodes.toArray.distinct)
              new DataLoadCoalescedUnwrapRDD(coalescedRdd)
            } else {
              rdd
            }
          val (transformedPlan, partitions, persistedRDDLocal) =
            CommonLoadUtils.transformQueryWithInternalRow(
              newRdd,
              loadParams.sparkSession,
              loadParams.carbonLoadModel,
              partitionValues,
              catalogTable,
              attributes,
              sortScope,
              table,
              loadParams.finalPartition)
          partitionsLen = partitions
          persistedRDD = persistedRDDLocal
          transformedPlan
        }
      } else {
        val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
        val rdd =
          if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
            CsvRDDHelper.csvFileScanRDDForLocalSort(
              loadParams.sparkSession,
              model = loadParams.carbonLoadModel,
              loadParams.hadoopConf)
              .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
          } else {
            CsvRDDHelper.csvFileScanRDD(
              loadParams.sparkSession,
              model = loadParams.carbonLoadModel,
              loadParams.hadoopConf)
              .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
          }
        val (transformedPlan, partitions, persistedRDDLocal) =
          transformQueryWithRow(
            rdd.asInstanceOf[RDD[Row]],
            loadParams.sparkSession,
            loadParams.carbonLoadModel,
            partitionValues,
            catalogTable,
            attributes,
            sortScope,
            isDataFrame = false,
            table,
            loadParams.finalPartition)
        partitionsLen = partitions
        persistedRDD = persistedRDDLocal
        transformedPlan
      }
      if (loadParams.updateModel.isDefined) {
        loadParams.carbonLoadModel.setFactTimeStamp(loadParams.updateModel.get.updatedTimeStamp)
      } else if (loadParams.carbonLoadModel.getFactTimeStamp == 0L) {
        loadParams.carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
      }
      val opt = collection.mutable.Map() ++ loadParams.optionsOriginal
      if (loadParams.scanResultRDD.isDefined) {
        opt += ((DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS, "true"))
      }
      // Create and ddd the segment to the tablestatus.
      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadParams.carbonLoadModel,
        loadParams.isOverwriteTable)
      CarbonHiveIndexMetadataUtil.updateTableStatusVersion(table,
        loadParams.sparkSession,
        loadParams.carbonLoadModel.getLatestTableStatusWriteVersion)
      val convertRelation = convertToLogicalRelation(
        catalogTable,
        loadParams.sizeInBytes,
        loadParams.isOverwriteTable,
        loadParams.carbonLoadModel,
        loadParams.sparkSession,
        loadParams.operationContext,
        loadParams.finalPartition,
        loadParams.updateModel,
        opt,
        loadParams.currPartitions)
      val convertedPlan =
        CarbonToSparkAdapter.getInsertIntoCommand(
          table = convertRelation,
          partition = loadParams.finalPartition,
          query = query,
          overwrite = false,
          ifPartitionNotExists = false)
      SparkUtil.setNullExecutionId(loadParams.sparkSession)
      Dataset.ofRows(loadParams.sparkSession, convertedPlan).collect()
    } catch {
      case ex: Throwable =>
        val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
        if (loadParams.updateModel.isDefined) {
          CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, executorMessage)
        }
        loadParams.operationContext.setProperty("Error message", errorMessage)
        LOGGER.info(errorMessage)
        LOGGER.error(ex)
        throw ex
    } finally {
      CarbonThreadUtil.threadUnset("partition.operationcontext")
      if (loadParams.isOverwriteTable) {
        IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
      }
      if (partitionsLen > 1) {
        // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
        // will not have any functional impact as spark automatically monitors the cache usage on
        // each node and drops out old data partitions in a least-recently used (LRU) fashion.
        persistedRDD match {
          case Some(rdd) => rdd.unpersist(false)
          case _ =>
        }
      }
    }
    // Pre-priming for Partition table here
    if (!StringUtils.isEmpty(loadParams.carbonLoadModel.getSegmentId)) {
      DistributedRDDUtils.triggerPrepriming(loadParams.sparkSession,
        table,
        Seq(),
        loadParams.operationContext,
        loadParams.hadoopConf,
        List(loadParams.carbonLoadModel.getSegmentId))
    }
    try {
      val compactedSegments = new util.ArrayList[String]()
      if (loadParams.updateModel.isEmpty) {
        // Trigger auto compaction
        CarbonDataRDDFactory.handleSegmentMerging(
          loadParams.sparkSession.sqlContext,
          loadParams.carbonLoadModel
            .getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
              loadParams.carbonLoadModel.getCsvDelimiter),
          table,
          compactedSegments,
          loadParams.operationContext)
        loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
      }
    } catch {
      case e: Exception =>
        LOGGER.error(
          "Auto-Compaction has failed. Ignoring this exception because the " +
          "load is passed.", e)
    }
    val specs =
      SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
        loadParams.carbonLoadModel.getTablePath,
        loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
    if (specs != null && !specs.isEmpty) {
      specs.asScala.map { spec =>
        Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
      }
    } else {
      Seq(Row(loadParams.carbonLoadModel.getSegmentId))
    }
  }