public void write()

in gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java [310:382]


  public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
    TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata(this.conf));
    Table table;
    try {
      table = getIcebergTable(tid);
    } catch (NoSuchTableException e) {
      try {
        if (gmce.getOperationType() == OperationType.drop_files ||
            gmce.getOperationType() == OperationType.change_property) {
          log.warn("Table {} does not exist, skip processing this {} event", tid.toString(), gmce.getOperationType());
          return;
        }
        table = createTable(gmce, tableSpec);
        tableMetadata.table = Optional.of(table);
      } catch (Exception e1) {
        log.error("skip processing {} for table {}.{} due to error when creating table", gmce.toString(),
            tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
        log.debug(e1.toString());
        return;
      }
    }
    if(tableMetadata.completenessEnabled) {
      tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
          String.valueOf(DEFAULT_COMPLETION_WATERMARK)));

      if (tableMetadata.totalCountCompletenessEnabled) {
        tableMetadata.totalCountCompletionWatermark = Long.parseLong(
            table.properties().getOrDefault(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
                String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
      }
    }

    computeCandidateSchema(gmce, tid, tableSpec);
    tableMetadata.ensureTxnInit();
    tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
    switch (gmce.getOperationType()) {
      case add_files: {
        updateTableProperty(tableSpec, tid, gmce);
        addFiles(gmce, newSpecsMap, table, tableMetadata);
        if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
            tableSpec.getTable().getTableName())) {
          tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
        }
        if (gmce.getTopicPartitionOffsetsRange() != null) {
          mergeOffsets(gmce, tid);
        }
        break;
      }
      case rewrite_files: {
        updateTableProperty(tableSpec, tid, gmce);
        rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
        break;
      }
      case drop_files: {
        dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
        break;
      }
      case change_property: {
        updateTableProperty(tableSpec, tid, gmce);
        if (gmce.getTopicPartitionOffsetsRange() != null) {
          mergeOffsets(gmce, tid);
        }
        log.info("No file operation need to be performed by Iceberg Metadata Writer at this point.");
        break;
      }
      default: {
        log.error("unsupported operation {}", gmce.getOperationType().toString());
        return;
      }
    }
  }