public void updateIcebergTable()

in metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/DirectSqlTable.java [236:305]


    public void updateIcebergTable(final TableInfo tableInfo) {
        final QualifiedName tableName = tableInfo.getName();
        final Map<String, String> newTableMetadata = tableInfo.getMetadata();
        //
        // Table info should have the table parameters with the metadata location.
        //
        HiveTableUtil.throwIfTableMetadataNullOrEmpty(tableName, newTableMetadata);

        //
        // If the previous metadata location is not empty, check if it is valid.
        //
        final String previousMetadataLocation = newTableMetadata.get(PARAM_PREVIOUS_METADATA_LOCATION);
        if (config.isIcebergPreviousMetadataLocationCheckEnabled() && !StringUtils.isBlank(previousMetadataLocation)) {
            boolean doesPathExists = true;
            try {
                final Path previousMetadataPath = new Path(previousMetadataLocation);
                doesPathExists = warehouse.getFs(previousMetadataPath).exists(previousMetadataPath);
            } catch (Exception ignored) {
                log.warn(String.format("Failed getting the filesystem for %s", previousMetadataLocation));
                registry.counter(HiveMetrics.CounterFileSystemReadFailure.name()).increment();
            }
            if (!doesPathExists) {
                throw new InvalidMetaException(tableName,
                    String.format("Invalid metadata for %s..Location %s does not exist",
                        tableName, previousMetadataLocation), null);
            }
        }
        final Long tableId = getTableId(tableName);
        Map<String, String> existingTableMetadata = null;
        log.debug("Lock Iceberg table {}", tableName);
        try {
            existingTableMetadata = jdbcTemplate.query(SQL.TABLE_PARAMS_LOCK,
                new SqlParameterValue[]{new SqlParameterValue(Types.BIGINT, tableId)}, rs -> {
                    final Map<String, String> result = Maps.newHashMap();
                    while (rs.next()) {
                        result.put(rs.getString(COL_PARAM_KEY), rs.getString(COL_PARAM_VALUE));
                    }
                    return result;
                });
        } catch (EmptyResultDataAccessException ex) {
            log.info(String.format("No parameters defined for iceberg table %s", tableName));
        } catch (Exception ex) {
            final String message = String.format("Failed getting a lock on iceberg table %s", tableName);
            log.warn(message, ex);
            throw new InvalidMetaException(tableName, message, null);
        }
        if (existingTableMetadata == null) {
            existingTableMetadata = Maps.newHashMap();
        }
        final boolean needUpdate = validateIcebergUpdate(tableName, existingTableMetadata, newTableMetadata);
        final String existingMetadataLocation = existingTableMetadata.get(PARAM_METADATA_LOCATION);
        final String newMetadataLocation = newTableMetadata.get(PARAM_METADATA_LOCATION);
        log.info("Servicing Iceberg commit request with tableId: {}, needUpdate: {}, "
                + "previousLocation: {}, existingLocation: {}, newLocation: {}",
                tableId, needUpdate, previousMetadataLocation, existingMetadataLocation, newMetadataLocation);
        if (needUpdate) {
            final MapDifference<String, String> diff = Maps.difference(existingTableMetadata, newTableMetadata);
            insertTableParams(tableId, diff.entriesOnlyOnRight());
            final Map<String, String> updateParams = diff.entriesDiffering().entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().rightValue()));
            updateTableParams(tableId, updateParams);
            //
            // In addition to updating the table params, the table location in HMS needs to be updated for usage by
            // external tools, that access HMS directly
            //
            updateTableLocation(tableId, tableInfo);
            log.info("Finished updating Iceberg table with tableId: {}", tableId);
        }
        log.debug("Unlocked Iceberg table {}", tableName);
    }