override def run()

in spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoOdpsTable.scala [63:256]


  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
    val hadoopConf = sparkSession.sessionState.newHadoopConf()
    val externalCatalog: HiveExternalCatalog =
      sparkSession.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog]

    if (table.tableType == CatalogTableType.EXTERNAL) {
        throw new SparkException(s"Unsupported table type for table write ${table.tableType}")
    }

    val numDynamicPartitions = partition.values.count(_.isEmpty)
    val numStaticPartitions = partition.values.count(_.nonEmpty)
    val odpsPartitionSpec = new PartitionSpec

    val partitionSchema = table.partitionSchema
    val partitionColumnNames = table.partitionColumnNames

    // By this time, the partition map must match the table's partition columns
    if (partitionColumnNames.toSet != partition.keySet) {
      throw QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, partition)
    }

    val outputPartitionColumns =
      outputColumns.filter(c => partitionSchema.getFieldIndex(c.name).isDefined)
    val outputPartitionSet = AttributeSet(outputPartitionColumns)
    val dataColumns = outputColumns.filterNot(outputPartitionSet.contains)

    if (partitionSchema.nonEmpty) {
      // val partitionSpec = partition.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
      val partitionSpec = partition.map {
        // TODO: null partition
        case (key, Some(value)) => key -> value
        case (key, None) => key -> ""
      }

      // Validate partition spec if there exist any dynamic partitions
      if (numDynamicPartitions > 0) {
        // Report error if dynamic partitioning is not enabled
        if (!hadoopConf.get("odps.exec.dynamic.partition", "true").toBoolean) {
          throw new SparkException("Dynamic partition is disabled. " +
            "Either enable it by setting odps.exec.dynamic.partition=true or specify partition column values")
        }

        // Report error if dynamic partition strict mode is on but no static partition is found
        if (numStaticPartitions == 0 &&
          hadoopConf.get("odps.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
          throw new SparkException("Dynamic partition strict mode requires at least one static partition column. " +
            "To turn this off set odps.exec.dynamic.partition.mode=nonstrict")
        }

        // Report error if any static partition appears after a dynamic partition
        val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
        if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
          throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
        }

        val dynamicPartitionSchema = StructType(partitionSchema.takeRight(numDynamicPartitions))
        dynamicPartitionSchema.map(_.dataType).foreach {
          case StringType | LongType | IntegerType | ShortType | ByteType =>
          case dt: DataType =>
            throw new SparkException(s"Unsupported partition column type: ${dt.simpleString}")
        }
      }

      if (numStaticPartitions > 0) {
        var part = 0
        partitionColumnNames.foreach { field =>
          if (part < numStaticPartitions) {
            odpsPartitionSpec.set(field, partitionSpec(field))
            part = part + 1
          }
        }
      }

      val isStaticPartition = numStaticPartitions > 0 && numDynamicPartitions == 0

      if (isStaticPartition && ifPartitionNotExists) {
        // scalastyle:off
        // ifNotExists is only valid with static partition, refer to
        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
        // scalastyle:on
        val oldPart =
        sparkSession.sharedState.externalCatalog.getPartitionOption(
          table.database,
          table.identifier.table,
          partitionSpec)
        if (oldPart.isDefined) {
          return Seq.empty[Row]
        }
      }
    }

    val provider = OdpsOptions.odpsTableWriterProvider(conf)
    val settings = OdpsClient.get.getEnvironmentSettings
    val project = table.database
    val tableName = table.identifier.table

    val arrowOptions = ArrowOptions.newBuilder()
      .withDatetimeUnit(TimestampUnit.MILLI)
      .withTimestampUnit(TimestampUnit.MICRO).build()

    val writeCapabilities = TableWriteCapabilities.newBuilder()
      .supportDynamicPartition(true)
      .supportHashBuckets(true)
      .supportRangeBuckets(true)
      .build()

    val odpsBucketSpec = if (table.bucketSpec.isDefined) externalCatalog.getOdpsBucketSpec(project, tableName) else None
    if (odpsBucketSpec.isDefined && !odpsBucketSpec.get.clusterType.toLowerCase.equals("hash")) {
      throw new SparkException(s"Write ${odpsBucketSpec.get.clusterType} bucketed table is not supported")
    }
    val bucketAttributes = odpsBucketSpec match {
      case Some(OdpsBucketSpec(_, _, bucketColumnNames, _)) =>
        bucketColumnNames.map(name => {
          query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse(
            throw new AnalysisException(
                  s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
          ).asInstanceOf[Attribute]
        })
      case _ => Seq.empty[Attribute]
    }
    val bucketSortOrders = odpsBucketSpec match {
      case Some(OdpsBucketSpec(_, _, _, sortColumns)) =>
        sortColumns.map(col => {
          val attr = query.resolve(col.name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse(
            throw new AnalysisException(
                  s"Unable to resolve ${col.name} given [${query.output.map(_.name).mkString(", ")}]")
          ).asInstanceOf[Attribute]
          SortOrder(attr, col.order.toUpperCase() match {
            case "ASC" => Ascending
            case _ => Descending
          })
        })
      case _ => Seq.empty[SortOrder]
    }

    val sinkBuilder = new TableWriteSessionBuilder()
      .identifier(TableIdentifier.of(project, tableName))
      .withArrowOptions(arrowOptions)
      .withCapabilities(writeCapabilities)
      .withSettings(settings)
      .overwrite(overwrite)
      .withSessionProvider(provider)

    if (partitionSchema.nonEmpty) {
      if (numStaticPartitions > 0) {
        sinkBuilder.partition(odpsPartitionSpec)
      }
    }

    val batchSink = sinkBuilder.buildBatchWriteSession
    logInfo(s"Create table sink ${batchSink.getId} for ${batchSink.getTableIdentifier}")

    val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
    val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
    val statsTracker = new OdpsWriteJobStatsTracker(metrics)
    val arrowDataFormat = new DataFormat(DataFormat.Type.ARROW, DataFormat.Version.V5)

    val description = new WriteJobDescription(
      serializableHadoopConf = serializableHadoopConf,
      batchSink = batchSink,
      staticPartition = odpsPartitionSpec,
      allColumns = outputColumns,
      dataColumns = dataColumns,
      partitionColumns = outputPartitionColumns,
      dynamicPartitionColumns = outputPartitionColumns,
      maxRecordsPerFile = sparkSession.sessionState.conf.maxRecordsPerFile,
      statsTrackers = Seq(statsTracker),
      writeBatchSize = OdpsOptions.odpsVectorizedWriterBatchSize(sparkSession.sessionState.conf),
      timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone,
      supportArrowWriter = batchSink.supportsDataFormat(arrowDataFormat),
      enableArrowExtension = OdpsOptions.odpsEnableArrowExtension(sparkSession.sessionState.conf),
      compressionCodec = OdpsOptions.odpsTableWriterCompressCodec(sparkSession.sessionState.conf),
      chunkSize = OdpsOptions.odpsWriterChunkSize(sparkSession.sessionState.conf),
      maxRetries = OdpsOptions.odpsWriterMaxRetires(sparkSession.sessionState.conf),
      maxSleepIntervalMs = OdpsOptions.odpsWriterRetrySleepIntervalMs(sparkSession.sessionState.conf),
      maxBlocks = OdpsOptions.odpsWriterMaxBlocks(sparkSession.sessionState.conf)
    )

    OdpsTableWriter.write(
      sparkSession,
      child,
      batchSink,
      description,
      outputColumns,
      table.bucketSpec,
      bucketAttributes,
      bucketSortOrders,
      overwrite)

    // Invalidate the cache.
    sparkSession.sessionState.catalog.refreshTable(table.identifier)

    Seq.empty[Row]
  }