in hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java [116:264]
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
boolean newView = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(metadata);
boolean hiveEngineEnabled = false;
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveView = false;
HiveLock lock = lockObject();
try {
lock.lock();
Table tbl = loadHmsTable();
if (tbl != null) {
// If we try to create the view but the metadata location is already set, then we had a
// concurrent commit
if (newView
&& tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
!= null) {
throw new AlreadyExistsException(
"%s already exists: %s.%s",
TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())
? ContentType.VIEW.value()
: ContentType.TABLE.value(),
database,
viewName);
}
updateHiveView = true;
LOG.debug("Committing existing view: {}", fullName);
} else {
tbl = newHMSView(metadata);
LOG.debug("Committing new view: {}", fullName);
}
tbl.setSd(
HiveOperationsBase.storageDescriptor(
metadata.schema(),
metadata.location(),
hiveEngineEnabled)); // set to pick up any schema changes
String metadataLocation =
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
throw new CommitFailedException(
"Cannot commit: Base metadata location '%s' is not same as the current view metadata location '%s' for %s.%s",
baseMetadataLocation, metadataLocation, database, viewName);
}
// get Iceberg props that have been removed
Set<String> removedProps = emptySet();
if (base != null) {
removedProps =
base.properties().keySet().stream()
.filter(key -> !metadata.properties().containsKey(key))
.collect(Collectors.toSet());
}
HMSTablePropertyHelper.updateHmsTableForIcebergView(
newMetadataLocation,
tbl,
metadata,
removedProps,
maxHiveTablePropertySize,
currentMetadataLocation());
lock.ensureActive();
try {
persistTable(tbl, updateHiveView, hiveLockEnabled(conf) ? null : baseMetadataLocation);
lock.ensureActive();
commitStatus = CommitStatus.SUCCESS;
} catch (LockException le) {
commitStatus = CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
"Failed to heartbeat for hive lock while "
+ "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+ "Please check the commit history. If you are running into this issue, try reducing "
+ "iceberg.hive.lock-heartbeat-interval-ms.",
le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "View already exists: %s.%s", database, viewName);
} catch (InvalidObjectException e) {
throw new ValidationException(e, "Invalid Hive object for %s.%s", database, viewName);
} catch (CommitFailedException | CommitStateUnknownException e) {
throw e;
} catch (Throwable e) {
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
throw new CommitFailedException(
e, "The view %s.%s has been modified concurrently", database, viewName);
}
if (e.getMessage() != null
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
"Failed to acquire locks from metastore because the underlying metastore "
+ "view 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not "
+ "support transactions. To fix this use an alternative metastore.",
e);
}
LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
viewName,
e);
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
commitStatus =
checkCommitStatus(
viewName,
newMetadataLocation,
metadata.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation));
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
throw e;
case UNKNOWN:
throw new CommitStateUnknownException(e);
}
}
} catch (TException e) {
throw new RuntimeException(
String.format("Metastore operation failed for %s.%s", database, viewName), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", e);
} catch (LockException e) {
throw new CommitFailedException(e);
} finally {
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
}
LOG.info(
"Committed to view {} with the new metadata location {}", fullName, newMetadataLocation);
}