in kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java [180:273]
private void commitToTable(
TableReference tableReference,
List<Envelope> envelopeList,
String offsetsJson,
OffsetDateTime validThroughTs) {
TableIdentifier tableIdentifier = tableReference.identifier();
Table table;
try {
table = catalog.loadTable(tableIdentifier);
} catch (NoSuchTableException e) {
LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e);
return;
}
String branch = config.tableConfig(tableIdentifier.toString()).commitBranch();
Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, branch);
List<DataWritten> payloads =
envelopeList.stream()
.filter(
envelope -> {
Long minOffset = committedOffsets.get(envelope.partition());
return minOffset == null || envelope.offset() >= minOffset;
})
.map(envelope -> (DataWritten) envelope.event().payload())
.collect(Collectors.toList());
List<DataFile> dataFiles =
payloads.stream()
.filter(payload -> payload.dataFiles() != null)
.flatMap(payload -> payload.dataFiles().stream())
.filter(dataFile -> dataFile.recordCount() > 0)
.filter(distinctByKey(ContentFile::location))
.collect(Collectors.toList());
List<DeleteFile> deleteFiles =
payloads.stream()
.filter(payload -> payload.deleteFiles() != null)
.flatMap(payload -> payload.deleteFiles().stream())
.filter(deleteFile -> deleteFile.recordCount() > 0)
.filter(distinctByKey(ContentFile::location))
.collect(Collectors.toList());
if (terminated) {
throw new ConnectException("Coordinator is terminated, commit aborted");
}
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
} else {
if (deleteFiles.isEmpty()) {
AppendFiles appendOp = table.newAppend();
if (branch != null) {
appendOp.toBranch(branch);
}
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (validThroughTs != null) {
appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
dataFiles.forEach(appendOp::appendFile);
appendOp.commit();
} else {
RowDelta deltaOp = table.newRowDelta();
if (branch != null) {
deltaOp.toBranch(branch);
}
deltaOp.set(snapshotOffsetsProp, offsetsJson);
deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (validThroughTs != null) {
deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
dataFiles.forEach(deltaOp::addRows);
deleteFiles.forEach(deltaOp::addDeletes);
deltaOp.commit();
}
Long snapshotId = latestSnapshot(table, branch).snapshotId();
Event event =
new Event(
config.connectGroupId(),
new CommitToTable(
commitState.currentCommitId(), tableReference, snapshotId, validThroughTs));
send(event);
LOG.info(
"Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}",
tableIdentifier,
snapshotId,
commitState.currentCommitId(),
validThroughTs);
}
}