in gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java [310:382]
public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata(this.conf));
Table table;
try {
table = getIcebergTable(tid);
} catch (NoSuchTableException e) {
try {
if (gmce.getOperationType() == OperationType.drop_files ||
gmce.getOperationType() == OperationType.change_property) {
log.warn("Table {} does not exist, skip processing this {} event", tid.toString(), gmce.getOperationType());
return;
}
table = createTable(gmce, tableSpec);
tableMetadata.table = Optional.of(table);
} catch (Exception e1) {
log.error("skip processing {} for table {}.{} due to error when creating table", gmce.toString(),
tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
log.debug(e1.toString());
return;
}
}
if(tableMetadata.completenessEnabled) {
tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
if (tableMetadata.totalCountCompletenessEnabled) {
tableMetadata.totalCountCompletionWatermark = Long.parseLong(
table.properties().getOrDefault(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
}
}
computeCandidateSchema(gmce, tid, tableSpec);
tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
updateTableProperty(tableSpec, tid, gmce);
addFiles(gmce, newSpecsMap, table, tableMetadata);
if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
}
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
break;
}
case rewrite_files: {
updateTableProperty(tableSpec, tid, gmce);
rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
break;
}
case drop_files: {
dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
break;
}
case change_property: {
updateTableProperty(tableSpec, tid, gmce);
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
log.info("No file operation need to be performed by Iceberg Metadata Writer at this point.");
break;
}
default: {
log.error("unsupported operation {}", gmce.getOperationType().toString());
return;
}
}
}