override def prepareWrite()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala [73:240]


  override def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {
    val conf = job.getConfiguration
    conf.setClass(
      SQLConf.OUTPUT_COMMITTER_CLASS.key,
      classOf[CarbonOutputCommitter],
      classOf[CarbonOutputCommitter])
    conf.set("carbondata.commit.protocol", "carbondata.commit.protocol")
    conf.set("mapreduce.task.deleteTaskAttemptPath", "false")
    conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
    conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
    val table = CarbonEnv.getCarbonTable(
      TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
    val model = new CarbonLoadModel
    val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
      .getOrElse(CarbonCommonConstants.COMPRESSOR,
        CompressorFactory.getInstance().getCompressor.getName)
    model.setColumnCompressor(columnCompressor)
    model.setMetrics(new DataLoadMetrics())
    model.setLatestTableStatusWriteVersion(options.getOrElse("latestversion", ""))

    val carbonProperty = CarbonProperties.getInstance()
    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
    // If spatial index property is configured, set flag to indicate spatial columns are present.
    // So that InputProcessorStepWithNoConverterImpl can generate the values for those columns,
    // convert them and then apply sort/write steps.
    val spatialIndex =
    table.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
    if (spatialIndex != null) {
      val sortScope = optionsFinal.get("sort_scope")
      if (sortScope.equalsIgnoreCase(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) {
        // Spatial Index non-schema column must be sorted
        optionsFinal.put("sort_scope", "LOCAL_SORT")
      }
      model.setNonSchemaColumnsPresent(true)
    }
    optionsFinal
      .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
    // If DATEFORMAT is not present in load options, check from table properties.
    if (optionsFinal.get("dateformat").isEmpty) {
      optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
        "dateformat", CarbonProperties.getInstance
          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
    }
    // If TIMESTAMPFORMAT is not present in load options, check from table properties.
    if (optionsFinal.get("timestampformat").isEmpty) {
      optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
        "timestampformat", CarbonProperties.getInstance
          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
    }
    val partitionStr =
      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
        _.getColumnName.toLowerCase).mkString(",")
    optionsFinal.put(
      "fileheader",
      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
    optionsFinal.put("header", "false")
    val optionsLocal = new mutable.HashMap[String, String]()
    optionsLocal ++= options
    new CarbonLoadModelBuilder(table).build(
      optionsLocal.toMap.asJava,
      optionsFinal,
      model,
      conf)
    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
    if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
      model.setLoadWithoutConverterWithoutReArrangeStep(true)
    } else {
      model.setLoadWithoutConverterStep(true)
    }
    val staticPartition = options.getOrElse("staticpartition", null)
    if (staticPartition != null) {
      conf.set("carbon.staticpartition", staticPartition)
    }
    // In case of update query there is chance to remove the older segments, so here we can set
    // the to be deleted segments to mark as delete while updating tablestatus
    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
    if (segemntsTobeDeleted.isDefined) {
      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
    }

    val currPartition = options.getOrElse("currentpartition", null)
    if (currPartition != null) {
      conf.set("carbon.currentpartition", currPartition)
    }
    // Update with the current in progress load.
    val loadEntryOption = CarbonOutputWriter.getObjectFromMap[LoadMetadataDetails](options,
      "currentloadentry")
    loadEntryOption.foreach { loadEntry =>
      model.setSegmentId(loadEntry.getLoadName)
      model.setFactTimeStamp(loadEntry.getLoadStartTime)
      if (!isLoadDetailsContainTheCurrentEntry(
        model.getLoadMetadataDetails.asScala.toArray, loadEntry)) {
        val details =
          SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath),
            table.getTableStatusVersion)
        val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
        list.add(loadEntry)
        model.setLoadMetadataDetails(list)
      }
    }
    // Set the update timestamp if user sets in case of update query. It needs to be updated
    // in load status update time
    val updateTimeStamp = options.get("updatetimestamp")
    if (updateTimeStamp.isDefined) {
      conf.set(CarbonTableOutputFormat.UPDATE_TIMESTAMP, updateTimeStamp.get)
    }
    conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    CarbonTableOutputFormat.setLoadModel(conf, model)

    new OutputWriterFactory {

      /**
       * counter used for generating task numbers. This is used to generate unique partition numbers
       * in case of partitioning
       */
      val counter = new AtomicLong()
      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()

      override def newInstance(
          path: String,
          dataSchema: StructType,
          context: TaskAttemptContext): OutputWriter = {
        val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
        val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
        CarbonProperties.getInstance().addProperty(
          CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
        val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
      }

      /**
       * Generate taskid using the taskid of taskcontext and the path. It should be unique in case
       * of partition tables.
       */
      private def generateTaskNumber(path: String,
          context: TaskAttemptContext, segmentId: String): String = {
        var partitionNumber: java.lang.Long = taskIdMap.get(path)
        if (partitionNumber == null) {
          partitionNumber = counter.incrementAndGet()
          // Generate taskid using the combination of taskid and partition number to make it unique.
          taskIdMap.put(path, partitionNumber)
        }
        val taskID = context.getTaskAttemptID.getTaskID.getId
        // In case of compaction the segmentId will be like 1.1. Therefore replacing '.' with ""
        // so that unique number can be generated
        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId.replace(".", ""), partitionNumber)
      }

      override def getFileExtension(context: TaskAttemptContext): String = {
        CarbonTablePath.CARBON_DATA_EXT
      }

    }
  }