in amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveTables.java [178:293]
protected KeyedTable createKeyedTable(
TableMeta tableMeta,
Schema schema,
PrimaryKeySpec primaryKeySpec,
PartitionSpec partitionSpec) {
boolean allowExistedHiveTable = allowExistedHiveTable(tableMeta);
TableIdentifier tableIdentifier = TableIdentifier.of(tableMeta.getTableIdentifier());
String baseLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_BASE);
String changeLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_CHANGE);
String tableLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_TABLE);
fillTableProperties(tableMeta);
String hiveLocation =
tableMeta.getProperties().get(HiveTableProperties.BASE_HIVE_LOCATION_ROOT);
// Default 1 day
if (!tableMeta.properties.containsKey(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL)) {
tableMeta.putToProperties(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "86400000");
}
AuthenticatedHadoopFileIO fileIO =
AuthenticatedFileIOs.buildRecoverableHadoopFileIO(
tableIdentifier,
tableLocation,
tableMeta.getProperties(),
tableMetaStore,
catalogProperties);
Table baseIcebergTable =
tableMetaStore.doAs(
() -> {
try {
Table createTable =
tables.create(schema, partitionSpec, tableMeta.getProperties(), baseLocation);
createTable
.updateProperties()
.set(
org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING,
NameMappingParser.toJson(MappingUtil.create(createTable.schema())))
.commit();
return createTable;
} catch (Exception e) {
throw new IllegalStateException("create base table failed", e);
}
});
UnkeyedHiveTable baseTable =
new KeyedHiveTable.HiveBaseInternalTable(
tableIdentifier,
MixedFormatCatalogUtil.useMixedTableOperations(
baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()),
fileIO,
tableLocation,
hiveClientPool,
catalogProperties,
false);
Table changeIcebergTable =
tableMetaStore.doAs(
() -> {
try {
Table createTable =
tables.create(schema, partitionSpec, tableMeta.getProperties(), changeLocation);
createTable
.updateProperties()
.set(
org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING,
NameMappingParser.toJson(MappingUtil.create(createTable.schema())))
.commit();
return createTable;
} catch (Exception e) {
throw new IllegalStateException("create change table failed", e);
}
});
ChangeTable changeTable =
new KeyedHiveTable.HiveChangeInternalTable(
tableIdentifier,
MixedFormatCatalogUtil.useMixedTableOperations(
changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()),
fileIO,
catalogProperties);
Map<String, String> metaProperties = tableMeta.getProperties();
try {
hiveClientPool.run(
client -> {
if (allowExistedHiveTable) {
org.apache.hadoop.hive.metastore.api.Table hiveTable =
client.getTable(tableIdentifier.getDatabase(), tableIdentifier.getTableName());
Map<String, String> hiveParameters = hiveTable.getParameters();
hiveParameters.putAll(constructProperties(primaryKeySpec, tableMeta));
hiveTable.setParameters(hiveParameters);
client.alterTable(
tableIdentifier.getDatabase(), tableIdentifier.getTableName(), hiveTable);
} else {
org.apache.hadoop.hive.metastore.api.Table hiveTable =
newHiveTable(tableMeta, schema, partitionSpec);
hiveTable.setSd(
HiveTableUtil.storageDescriptor(
schema,
partitionSpec,
hiveLocation,
FileFormat.valueOf(
PropertyUtil.propertyAsString(
metaProperties,
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)
.toUpperCase(Locale.ENGLISH))));
setProToHive(hiveTable, primaryKeySpec, tableMeta);
client.createTable(hiveTable);
}
return null;
});
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to create hive table:" + tableMeta.getTableIdentifier(), e);
}
return new KeyedHiveTable(
tableMeta, tableLocation, primaryKeySpec, hiveClientPool, baseTable, changeTable);
}