in hologres-connector-spark-base/src/main/scala/com/alibaba/hologres/spark/utils/SparkHoloUtil.scala [132:207]
def chooseBestMode(sparkSchema: StructType, hologresConfigs: HologresConfigs): HologresConfigs = {
val holoClient: HoloClient = new HoloClient(hologresConfigs.holoConfig)
try {
val holoSchema = holoClient.getTableSchema(TableName.valueOf(hologresConfigs.table))
var holoVersion: HoloVersion = null
try holoVersion = holoClient.sql[HoloVersion](getHoloVersion).get()
catch {
case e: Exception =>
throw new IOException("Failed to get holo version", e)
}
// 2.2.25之后支持全字段时的bulk_load_onc_conflict, 3.1.0之后支持部分字段的bulk_load_on_conflict
val supportBulkLoadOnConflict = holoSchema.getPrimaryKeys.length > 0 &&
((holoVersion.compareTo(new HoloVersion(2, 2, 25)) > 0 && sparkSchema.fields.length == holoSchema.getColumnSchema.length)
|| holoVersion.compareTo(new HoloVersion(3, 1, 0)) > 0)
val supportBulkLoad = holoSchema.getPrimaryKeys.length == 0 && holoVersion.compareTo(new HoloVersion(2, 1, 0)) > 0
val supportStreamCopy = holoVersion.compareTo(new HoloVersion(1, 3, 24)) > 0
val couldReshuffle = holoSchema.getDistributionKeys.length > 0
// choose best write mode
if ("auto" == hologresConfigs.writeMode) {
if (supportBulkLoadOnConflict) {
hologresConfigs.writeMode = CopyMode.BULK_LOAD_ON_CONFLICT
// 数据未经过reshuffle, 则将needReshuffle设置为true
if (!hologresConfigs.reshuffleByHoloDistributionKey && !hologresConfigs.needReshuffle && couldReshuffle) {
hologresConfigs.needReshuffle = true
}
} else if (supportBulkLoad) {
hologresConfigs.writeMode = CopyMode.BULK_LOAD
// 无主键表如果配置了distribution key, 也可以reshuffle
if (!hologresConfigs.reshuffleByHoloDistributionKey && !hologresConfigs.needReshuffle && couldReshuffle) {
hologresConfigs.needReshuffle = true
}
} else if (supportStreamCopy) {
hologresConfigs.writeMode = CopyMode.STREAM
} else {
hologresConfigs.writeMode = "insert"
}
logger.info(s"choose best write mode: ${hologresConfigs.writeMode}")
if (hologresConfigs.needReshuffle) {
logger.info(s"need reshuffle.")
}
}
// choose best read mode
if ("auto" == hologresConfigs.readMode) {
val supportCompressed = holoVersion.compareTo(new HoloVersion(3, 0, 24)) >= 0
var hasJsonBType = false
sparkSchema.fields.foreach(column => {
if (holoSchema.getColumnIndex(column.name) == null) {
throw new IllegalArgumentException(String.format("column %s does not exist in hologres table %s", column.name, holoSchema.getTableName))
}
val holoColumn = holoSchema.getColumn(holoSchema.getColumnIndex(column.name))
if (holoColumn.getTypeName == "jsonb") {
hasJsonBType = true
}
})
if (hasJsonBType) {
hologresConfigs.readMode = "select"
} else if (supportCompressed) {
hologresConfigs.readMode = "bulk_read_compressed"
} else {
hologresConfigs.readMode = "bulk_read"
}
logger.info(s"choose best read mode: ${hologresConfigs.readMode}")
}
// 尝试直连,无法直连则各个tasks内不需要进行尝试
if (hologresConfigs.directConnect) {
hologresConfigs.directConnect = JDBCUtil.couldDirectConnect(hologresConfigs)
}
hologresConfigs
} finally {
if (holoClient != null) {
holoClient.close()
}
}
}