public void commitSink()

in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [124:184]


    public void commitSink(Collection<PinotSinkCommittable> collection) throws IOException, InterruptedException {
        if (collection.isEmpty()) return;

        // List of failed global committables that can be retried later on
        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();

        PinotSinkGlobalCommittable globalCommittable = this.combine(new ArrayList<>(collection));

        filterRecoveredCommittables(globalCommittable);

        Set<Future<Boolean>> resultFutures = new HashSet<>();
        // Commit all segments in globalCommittable
        for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
            String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
            // Get segment names with increasing sequenceIds
            String segmentName = getSegmentName(globalCommittable, sequenceId);
            // Segment committer handling the whole commit process for a single segment
            Callable<Boolean> segmentCommitter = new SegmentCommitter(
                    pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
                    dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
                    segmentTimeUnit
            );
            // Submits the segment committer to the thread pool
            resultFutures.add(pool.submit(segmentCommitter));
        }

        boolean commitSucceeded = true;
        try {
            for (Future<Boolean> wasSuccessful : resultFutures) {
                // In case any of the segment commits wasn't successful we mark the whole
                // globalCommittable as failed
                if (!wasSuccessful.get()) {
                    commitSucceeded = false;
                    failedCommits.add(globalCommittable);
                    // Once any of the commits failed, we do not need to check the remaining
                    // ones, as we try to commit the globalCommittable next time
                    break;
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            // In case of an exception thrown while accessing commit status, mark the whole
            // globalCommittable as failed
            failedCommits.add(globalCommittable);
            LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
        }

        if (commitSucceeded) {
            // If commit succeeded, cleanup the data files stored on the shared file system. In
            // case the commit of at least one of the segments failed, nothing will be cleaned
            // up here to enable retrying failed commits (data files must therefore stay
            // available on the shared filesystem).
            for (String path : globalCommittable.getDataFilePaths()) {
                fsAdapter.deleteFromSharedFileSystem(path);
            }
        }

        if (failedCommits.size() > 0) {
            LOG.error(failedCommits.toString());
        }

    }