def chooseBestMode()

in hologres-connector-spark-base/src/main/scala/com/alibaba/hologres/spark/utils/SparkHoloUtil.scala [132:207]


  def chooseBestMode(sparkSchema: StructType, hologresConfigs: HologresConfigs): HologresConfigs = {
    val holoClient: HoloClient = new HoloClient(hologresConfigs.holoConfig)
    try {
      val holoSchema = holoClient.getTableSchema(TableName.valueOf(hologresConfigs.table))
      var holoVersion: HoloVersion = null
      try holoVersion = holoClient.sql[HoloVersion](getHoloVersion).get()
      catch {
        case e: Exception =>
          throw new IOException("Failed to get holo version", e)
      }

      // 2.2.25之后支持全字段时的bulk_load_onc_conflict, 3.1.0之后支持部分字段的bulk_load_on_conflict
      val supportBulkLoadOnConflict = holoSchema.getPrimaryKeys.length > 0 &&
        ((holoVersion.compareTo(new HoloVersion(2, 2, 25)) > 0 && sparkSchema.fields.length == holoSchema.getColumnSchema.length)
          || holoVersion.compareTo(new HoloVersion(3, 1, 0)) > 0)
      val supportBulkLoad = holoSchema.getPrimaryKeys.length == 0 && holoVersion.compareTo(new HoloVersion(2, 1, 0)) > 0
      val supportStreamCopy = holoVersion.compareTo(new HoloVersion(1, 3, 24)) > 0
      val couldReshuffle = holoSchema.getDistributionKeys.length > 0
      // choose best write mode
      if ("auto" == hologresConfigs.writeMode) {
        if (supportBulkLoadOnConflict) {
          hologresConfigs.writeMode = CopyMode.BULK_LOAD_ON_CONFLICT
          // 数据未经过reshuffle, 则将needReshuffle设置为true
          if (!hologresConfigs.reshuffleByHoloDistributionKey && !hologresConfigs.needReshuffle && couldReshuffle) {
            hologresConfigs.needReshuffle = true
          }
        } else if (supportBulkLoad) {
          hologresConfigs.writeMode = CopyMode.BULK_LOAD
          // 无主键表如果配置了distribution key, 也可以reshuffle
          if (!hologresConfigs.reshuffleByHoloDistributionKey && !hologresConfigs.needReshuffle && couldReshuffle) {
            hologresConfigs.needReshuffle = true
          }
        } else if (supportStreamCopy) {
          hologresConfigs.writeMode = CopyMode.STREAM
        } else {
          hologresConfigs.writeMode = "insert"
        }
        logger.info(s"choose best write mode: ${hologresConfigs.writeMode}")
        if (hologresConfigs.needReshuffle) {
          logger.info(s"need reshuffle.")
        }
      }

      // choose best read mode
      if ("auto" == hologresConfigs.readMode) {
        val supportCompressed = holoVersion.compareTo(new HoloVersion(3, 0, 24)) >= 0
        var hasJsonBType = false
        sparkSchema.fields.foreach(column => {
          if (holoSchema.getColumnIndex(column.name) == null) {
            throw new IllegalArgumentException(String.format("column %s does not exist in hologres table %s", column.name, holoSchema.getTableName))
          }
          val holoColumn = holoSchema.getColumn(holoSchema.getColumnIndex(column.name))
          if (holoColumn.getTypeName == "jsonb") {
            hasJsonBType = true
          }
        })
        if (hasJsonBType) {
          hologresConfigs.readMode = "select"
        } else if (supportCompressed) {
          hologresConfigs.readMode = "bulk_read_compressed"
        } else {
          hologresConfigs.readMode = "bulk_read"
        }
        logger.info(s"choose best read mode: ${hologresConfigs.readMode}")
      }
      // 尝试直连,无法直连则各个tasks内不需要进行尝试
      if (hologresConfigs.directConnect) {
        hologresConfigs.directConnect = JDBCUtil.couldDirectConnect(hologresConfigs)
      }
      hologresConfigs
    } finally {
      if (holoClient != null) {
        holoClient.close()
      }
    }
  }