in integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala [73:240]
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[CarbonOutputCommitter],
classOf[CarbonOutputCommitter])
conf.set("carbondata.commit.protocol", "carbondata.commit.protocol")
conf.set("mapreduce.task.deleteTaskAttemptPath", "false")
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
val table = CarbonEnv.getCarbonTable(
TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
val model = new CarbonLoadModel
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
model.setColumnCompressor(columnCompressor)
model.setMetrics(new DataLoadMetrics())
model.setLatestTableStatusWriteVersion(options.getOrElse("latestversion", ""))
val carbonProperty = CarbonProperties.getInstance()
val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
// If spatial index property is configured, set flag to indicate spatial columns are present.
// So that InputProcessorStepWithNoConverterImpl can generate the values for those columns,
// convert them and then apply sort/write steps.
val spatialIndex =
table.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
if (spatialIndex != null) {
val sortScope = optionsFinal.get("sort_scope")
if (sortScope.equalsIgnoreCase(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) {
// Spatial Index non-schema column must be sorted
optionsFinal.put("sort_scope", "LOCAL_SORT")
}
model.setNonSchemaColumnsPresent(true)
}
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
// If DATEFORMAT is not present in load options, check from table properties.
if (optionsFinal.get("dateformat").isEmpty) {
optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
"dateformat", CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
}
// If TIMESTAMPFORMAT is not present in load options, check from table properties.
if (optionsFinal.get("timestampformat").isEmpty) {
optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
"timestampformat", CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
}
val partitionStr =
table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
_.getColumnName.toLowerCase).mkString(",")
optionsFinal.put(
"fileheader",
dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
optionsFinal.put("header", "false")
val optionsLocal = new mutable.HashMap[String, String]()
optionsLocal ++= options
new CarbonLoadModelBuilder(table).build(
optionsLocal.toMap.asJava,
optionsFinal,
model,
conf)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
model.setLoadWithoutConverterWithoutReArrangeStep(true)
} else {
model.setLoadWithoutConverterStep(true)
}
val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
conf.set("carbon.staticpartition", staticPartition)
}
// In case of update query there is chance to remove the older segments, so here we can set
// the to be deleted segments to mark as delete while updating tablestatus
val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
if (segemntsTobeDeleted.isDefined) {
conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
}
val currPartition = options.getOrElse("currentpartition", null)
if (currPartition != null) {
conf.set("carbon.currentpartition", currPartition)
}
// Update with the current in progress load.
val loadEntryOption = CarbonOutputWriter.getObjectFromMap[LoadMetadataDetails](options,
"currentloadentry")
loadEntryOption.foreach { loadEntry =>
model.setSegmentId(loadEntry.getLoadName)
model.setFactTimeStamp(loadEntry.getLoadStartTime)
if (!isLoadDetailsContainTheCurrentEntry(
model.getLoadMetadataDetails.asScala.toArray, loadEntry)) {
val details =
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath),
table.getTableStatusVersion)
val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
list.add(loadEntry)
model.setLoadMetadataDetails(list)
}
}
// Set the update timestamp if user sets in case of update query. It needs to be updated
// in load status update time
val updateTimeStamp = options.get("updatetimestamp")
if (updateTimeStamp.isDefined) {
conf.set(CarbonTableOutputFormat.UPDATE_TIMESTAMP, updateTimeStamp.get)
}
conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
CarbonTableOutputFormat.setLoadModel(conf, model)
new OutputWriterFactory {
/**
* counter used for generating task numbers. This is used to generate unique partition numbers
* in case of partitioning
*/
val counter = new AtomicLong()
val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
}
/**
* Generate taskid using the taskid of taskcontext and the path. It should be unique in case
* of partition tables.
*/
private def generateTaskNumber(path: String,
context: TaskAttemptContext, segmentId: String): String = {
var partitionNumber: java.lang.Long = taskIdMap.get(path)
if (partitionNumber == null) {
partitionNumber = counter.incrementAndGet()
// Generate taskid using the combination of taskid and partition number to make it unique.
taskIdMap.put(path, partitionNumber)
}
val taskID = context.getTaskAttemptID.getTaskID.getId
// In case of compaction the segmentId will be like 1.1. Therefore replacing '.' with ""
// so that unique number can be generated
CarbonScalaUtil.generateUniqueNumber(taskID, segmentId.replace(".", ""), partitionNumber)
}
override def getFileExtension(context: TaskAttemptContext): String = {
CarbonTablePath.CARBON_DATA_EXT
}
}
}