in hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java [132:293]
protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);
BaseMetastoreOperations.CommitStatus commitStatus =
BaseMetastoreOperations.CommitStatus.FAILURE;
boolean updateHiveTable = false;
HiveLock lock = lockObject(base);
try {
lock.lock();
Table tbl = loadHmsTable();
if (tbl != null) {
// If we try to create the table but the metadata location is already set, then we had a
// concurrent commit
if (newTable
&& tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
!= null) {
if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) {
throw new AlreadyExistsException(
"View with same name already exists: %s.%s", database, tableName);
}
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
}
updateHiveTable = true;
LOG.debug("Committing existing table: {}", fullName);
} else {
tbl =
newHmsTable(
metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()));
LOG.debug("Committing new table: {}", fullName);
}
tbl.setSd(
HiveOperationsBase.storageDescriptor(
metadata.schema(),
metadata.location(),
hiveEngineEnabled)); // set to pickup any schema changes
String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
throw new CommitFailedException(
"Cannot commit: Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s",
baseMetadataLocation, metadataLocation, database, tableName);
}
// get Iceberg props that have been removed
Set<String> removedProps = Collections.emptySet();
if (base != null) {
removedProps =
base.properties().keySet().stream()
.filter(key -> !metadata.properties().containsKey(key))
.collect(Collectors.toSet());
}
HMSTablePropertyHelper.updateHmsTableForIcebergTable(
newMetadataLocation,
tbl,
metadata,
removedProps,
hiveEngineEnabled,
maxHiveTablePropertySize,
currentMetadataLocation());
if (!keepHiveStats) {
tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
}
lock.ensureActive();
try {
persistTable(
tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null : baseMetadataLocation);
lock.ensureActive();
commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
} catch (LockException le) {
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
"Failed to heartbeat for hive lock while "
+ "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+ "Please check the commit history. If you are running into this issue, try reducing "
+ "iceberg.hive.lock-heartbeat-interval-ms.",
le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
} catch (InvalidObjectException e) {
throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName);
} catch (CommitFailedException | CommitStateUnknownException e) {
throw e;
} catch (Throwable e) {
if (e.getMessage() != null
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
"Failed to acquire locks from metastore because the underlying metastore "
+ "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not "
+ "support transactions. To fix this use an alternative metastore.",
e);
}
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ HiveTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
// It's possible the HMS client incorrectly retries a successful operation, due to network
// issue for example, and triggers this exception. So we need double-check to make sure
// this is really a concurrent modification. Hitting this exception means no pending
// requests, if any, can succeed later, so it's safe to check status in strict mode
commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata);
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
throw new CommitFailedException(
e, "The table %s.%s has been modified concurrently", database, tableName);
}
} else {
LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
tableName,
e);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
}
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
throw e;
case UNKNOWN:
throw new CommitStateUnknownException(e);
}
}
} catch (TException e) {
throw new RuntimeException(
String.format("Metastore operation failed for %s.%s", database, tableName), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", e);
} catch (LockException e) {
throw new CommitFailedException(e);
} finally {
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
}
LOG.info(
"Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}