in metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/DirectSqlTable.java [236:305]
public void updateIcebergTable(final TableInfo tableInfo) {
final QualifiedName tableName = tableInfo.getName();
final Map<String, String> newTableMetadata = tableInfo.getMetadata();
//
// Table info should have the table parameters with the metadata location.
//
HiveTableUtil.throwIfTableMetadataNullOrEmpty(tableName, newTableMetadata);
//
// If the previous metadata location is not empty, check if it is valid.
//
final String previousMetadataLocation = newTableMetadata.get(PARAM_PREVIOUS_METADATA_LOCATION);
if (config.isIcebergPreviousMetadataLocationCheckEnabled() && !StringUtils.isBlank(previousMetadataLocation)) {
boolean doesPathExists = true;
try {
final Path previousMetadataPath = new Path(previousMetadataLocation);
doesPathExists = warehouse.getFs(previousMetadataPath).exists(previousMetadataPath);
} catch (Exception ignored) {
log.warn(String.format("Failed getting the filesystem for %s", previousMetadataLocation));
registry.counter(HiveMetrics.CounterFileSystemReadFailure.name()).increment();
}
if (!doesPathExists) {
throw new InvalidMetaException(tableName,
String.format("Invalid metadata for %s..Location %s does not exist",
tableName, previousMetadataLocation), null);
}
}
final Long tableId = getTableId(tableName);
Map<String, String> existingTableMetadata = null;
log.debug("Lock Iceberg table {}", tableName);
try {
existingTableMetadata = jdbcTemplate.query(SQL.TABLE_PARAMS_LOCK,
new SqlParameterValue[]{new SqlParameterValue(Types.BIGINT, tableId)}, rs -> {
final Map<String, String> result = Maps.newHashMap();
while (rs.next()) {
result.put(rs.getString(COL_PARAM_KEY), rs.getString(COL_PARAM_VALUE));
}
return result;
});
} catch (EmptyResultDataAccessException ex) {
log.info(String.format("No parameters defined for iceberg table %s", tableName));
} catch (Exception ex) {
final String message = String.format("Failed getting a lock on iceberg table %s", tableName);
log.warn(message, ex);
throw new InvalidMetaException(tableName, message, null);
}
if (existingTableMetadata == null) {
existingTableMetadata = Maps.newHashMap();
}
final boolean needUpdate = validateIcebergUpdate(tableName, existingTableMetadata, newTableMetadata);
final String existingMetadataLocation = existingTableMetadata.get(PARAM_METADATA_LOCATION);
final String newMetadataLocation = newTableMetadata.get(PARAM_METADATA_LOCATION);
log.info("Servicing Iceberg commit request with tableId: {}, needUpdate: {}, "
+ "previousLocation: {}, existingLocation: {}, newLocation: {}",
tableId, needUpdate, previousMetadataLocation, existingMetadataLocation, newMetadataLocation);
if (needUpdate) {
final MapDifference<String, String> diff = Maps.difference(existingTableMetadata, newTableMetadata);
insertTableParams(tableId, diff.entriesOnlyOnRight());
final Map<String, String> updateParams = diff.entriesDiffering().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().rightValue()));
updateTableParams(tableId, updateParams);
//
// In addition to updating the table params, the table location in HMS needs to be updated for usage by
// external tools, that access HMS directly
//
updateTableLocation(tableId, tableInfo);
log.info("Finished updating Iceberg table with tableId: {}", tableId);
}
log.debug("Unlocked Iceberg table {}", tableName);
}