in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala [869:1162]
def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get
CarbonThreadUtil.threadSet("partition.operationcontext", loadParams.operationContext)
val attributes = if (loadParams.scanResultRDD.isDefined) {
// take the already re-arranged attributes
catalogTable.schema.toAttributes
} else {
// input data from csv files. Convert to logical plan
val allCols = new ArrayBuffer[String]()
// get only the visible dimensions from table
allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
StructType(
allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
StructField(_, StringType))).toAttributes
}
var partitionsLen = 0
val sortScope = CarbonDataProcessorUtil.getSortScope(loadParams.carbonLoadModel.getSortScope)
val partitionValues = if (loadParams.finalPartition.nonEmpty) {
loadParams.finalPartition.filter(_._2.nonEmpty).map { case (col, value) =>
catalogTable.schema.find(_.name.equalsIgnoreCase(col)) match {
case Some(c) =>
CarbonScalaUtil.convertToDateAndTimeFormats(
value.get,
c.dataType,
loadParams.timeStampFormat,
loadParams.dateFormat)
case None =>
throw new AnalysisException(s"$col is not a valid partition column in table ${
loadParams.carbonLoadModel
.getDatabaseName
}.${ loadParams.carbonLoadModel.getTableName }")
}
}.toArray
} else {
Array[String]()
}
var persistedRDD: Option[RDD[InternalRow]] = None
val partitionBasedOnLocality = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
try {
val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
loadParams.scanResultRDD.isDefined) {
val (rdd, dfAttributes) = {
// Get the updated query plan in case of update scenario
if (loadParams.finalPartition.nonEmpty) {
val headers = loadParams.carbonLoadModel
.getCsvHeaderColumns
.dropRight(loadParams.finalPartition.size)
val updatedHeader = headers ++ loadParams.finalPartition.keys.map(_.toLowerCase)
loadParams.carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
loadParams.carbonLoadModel
.setCsvHeaderColumns(loadParams.carbonLoadModel.getCsvHeader.split(","))
}
if (loadParams.dataFrame.isDefined) {
(loadParams.dataFrame.get.rdd, loadParams.dataFrame.get.schema)
} else {
(null, null)
}
}
if (loadParams.dataFrame.isDefined) {
val expectedColumns = {
val staticPartCols = loadParams.finalPartition.filter(_._2.isDefined).keySet
.map(columnName => columnName.toLowerCase())
attributes.filterNot(a => staticPartCols.contains(a.name.toLowerCase))
}
val spatialProperty = catalogTable.storage
.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
// For spatial table, dataframe attributes will not contain geoId column.
val isSpatialTable = spatialProperty.isDefined && spatialProperty.nonEmpty &&
dfAttributes.length + 1 == expectedColumns.size
if (expectedColumns.length != dfAttributes.length && !isSpatialTable) {
throw new AnalysisException(
s"Cannot insert into table $loadParams.tableName because the number of columns are " +
s"different: " +
s"need ${ expectedColumns.length } columns, " +
s"but query has ${ dfAttributes.length } columns.")
}
val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray
val partitionBounds = new Array[Int](partitionValues.length)
if (loadParams.finalPartition.nonEmpty) {
val nonPartitionSchemaLen = attributes.length - loadParams.finalPartition.size
var i = nonPartitionSchemaLen
var index = 0
var partIndex = 0
loadParams.finalPartition.values.foreach { p =>
if (p.isDefined) {
partitionBounds(partIndex) = nonPartitionSchemaLen + index
partIndex = partIndex + 1
} else {
nonPartitionBounds(i) = nonPartitionSchemaLen + index
i = i + 1
}
index = index + 1
}
}
val len = dfAttributes.length + partitionValues.length
val transRdd = rdd.map { f =>
val data = new Array[Any](len)
var i = 0
val length = f.length
while (i < length) {
data(nonPartitionBounds(i)) = f.get(i)
i = i + 1
}
var j = 0
val boundLength = partitionBounds.length
while (j < boundLength) {
data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j))
j = j + 1
}
Row.fromSeq(data)
}
val (transformedPlan, partitions, persistedRDDLocal) =
transformQueryWithRow(
transRdd,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
isDataFrame = true, table, loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
} else {
val rdd = loadParams.scanResultRDD.get
val newRdd =
if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.length
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
loadParams.sparkSession.sqlContext.sparkContext)
val coalescedRdd = new DataLoadCoalescedRDD[InternalRow](
loadParams.sparkSession,
rdd,
nodes.toArray.distinct)
new DataLoadCoalescedUnwrapRDD(coalescedRdd)
} else {
rdd
}
val (transformedPlan, partitions, persistedRDDLocal) =
CommonLoadUtils.transformQueryWithInternalRow(
newRdd,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
table,
loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
}
} else {
val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
val rdd =
if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
CsvRDDHelper.csvFileScanRDDForLocalSort(
loadParams.sparkSession,
model = loadParams.carbonLoadModel,
loadParams.hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
} else {
CsvRDDHelper.csvFileScanRDD(
loadParams.sparkSession,
model = loadParams.carbonLoadModel,
loadParams.hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
val (transformedPlan, partitions, persistedRDDLocal) =
transformQueryWithRow(
rdd.asInstanceOf[RDD[Row]],
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
isDataFrame = false,
table,
loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
}
if (loadParams.updateModel.isDefined) {
loadParams.carbonLoadModel.setFactTimeStamp(loadParams.updateModel.get.updatedTimeStamp)
} else if (loadParams.carbonLoadModel.getFactTimeStamp == 0L) {
loadParams.carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val opt = collection.mutable.Map() ++ loadParams.optionsOriginal
if (loadParams.scanResultRDD.isDefined) {
opt += ((DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS, "true"))
}
// Create and ddd the segment to the tablestatus.
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadParams.carbonLoadModel,
loadParams.isOverwriteTable)
CarbonHiveIndexMetadataUtil.updateTableStatusVersion(table,
loadParams.sparkSession,
loadParams.carbonLoadModel.getLatestTableStatusWriteVersion)
val convertRelation = convertToLogicalRelation(
catalogTable,
loadParams.sizeInBytes,
loadParams.isOverwriteTable,
loadParams.carbonLoadModel,
loadParams.sparkSession,
loadParams.operationContext,
loadParams.finalPartition,
loadParams.updateModel,
opt,
loadParams.currPartitions)
val convertedPlan =
CarbonToSparkAdapter.getInsertIntoCommand(
table = convertRelation,
partition = loadParams.finalPartition,
query = query,
overwrite = false,
ifPartitionNotExists = false)
SparkUtil.setNullExecutionId(loadParams.sparkSession)
Dataset.ofRows(loadParams.sparkSession, convertedPlan).collect()
} catch {
case ex: Throwable =>
val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
if (loadParams.updateModel.isDefined) {
CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, executorMessage)
}
loadParams.operationContext.setProperty("Error message", errorMessage)
LOGGER.info(errorMessage)
LOGGER.error(ex)
throw ex
} finally {
CarbonThreadUtil.threadUnset("partition.operationcontext")
if (loadParams.isOverwriteTable) {
IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
}
if (partitionsLen > 1) {
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
// will not have any functional impact as spark automatically monitors the cache usage on
// each node and drops out old data partitions in a least-recently used (LRU) fashion.
persistedRDD match {
case Some(rdd) => rdd.unpersist(false)
case _ =>
}
}
}
// Pre-priming for Partition table here
if (!StringUtils.isEmpty(loadParams.carbonLoadModel.getSegmentId)) {
DistributedRDDUtils.triggerPrepriming(loadParams.sparkSession,
table,
Seq(),
loadParams.operationContext,
loadParams.hadoopConf,
List(loadParams.carbonLoadModel.getSegmentId))
}
try {
val compactedSegments = new util.ArrayList[String]()
if (loadParams.updateModel.isEmpty) {
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
loadParams.sparkSession.sqlContext,
loadParams.carbonLoadModel
.getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
loadParams.carbonLoadModel.getCsvDelimiter),
table,
compactedSegments,
loadParams.operationContext)
loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
LOGGER.error(
"Auto-Compaction has failed. Ignoring this exception because the " +
"load is passed.", e)
}
val specs =
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
loadParams.carbonLoadModel.getTablePath,
loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
if (specs != null && !specs.isEmpty) {
specs.asScala.map { spec =>
Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
}
} else {
Seq(Row(loadParams.carbonLoadModel.getSegmentId))
}
}