in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [124:184]
public void commitSink(Collection<PinotSinkCommittable> collection) throws IOException, InterruptedException {
if (collection.isEmpty()) return;
// List of failed global committables that can be retried later on
List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
PinotSinkGlobalCommittable globalCommittable = this.combine(new ArrayList<>(collection));
filterRecoveredCommittables(globalCommittable);
Set<Future<Boolean>> resultFutures = new HashSet<>();
// Commit all segments in globalCommittable
for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
// Get segment names with increasing sequenceIds
String segmentName = getSegmentName(globalCommittable, sequenceId);
// Segment committer handling the whole commit process for a single segment
Callable<Boolean> segmentCommitter = new SegmentCommitter(
pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
segmentTimeUnit
);
// Submits the segment committer to the thread pool
resultFutures.add(pool.submit(segmentCommitter));
}
boolean commitSucceeded = true;
try {
for (Future<Boolean> wasSuccessful : resultFutures) {
// In case any of the segment commits wasn't successful we mark the whole
// globalCommittable as failed
if (!wasSuccessful.get()) {
commitSucceeded = false;
failedCommits.add(globalCommittable);
// Once any of the commits failed, we do not need to check the remaining
// ones, as we try to commit the globalCommittable next time
break;
}
}
} catch (InterruptedException | ExecutionException e) {
// In case of an exception thrown while accessing commit status, mark the whole
// globalCommittable as failed
failedCommits.add(globalCommittable);
LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
}
if (commitSucceeded) {
// If commit succeeded, cleanup the data files stored on the shared file system. In
// case the commit of at least one of the segments failed, nothing will be cleaned
// up here to enable retrying failed commits (data files must therefore stay
// available on the shared filesystem).
for (String path : globalCommittable.getDataFilePaths()) {
fsAdapter.deleteFromSharedFileSystem(path);
}
}
if (failedCommits.size() > 0) {
LOG.error(failedCommits.toString());
}
}