in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [355:392]
public Boolean call() {
// Local copy of data file stored on the shared filesystem
File segmentData = null;
// File containing the final Pinot segment
File segmentFile = null;
try {
// Download data file from the shared filesystem
LOG.debug("Downloading data file {} from shared file system...", dataFilePath);
List<String> serializedElements = fsAdapter.readFromSharedFileSystem(dataFilePath);
segmentData = FileSystemUtils.writeToLocalFile(serializedElements, tempDirectory);
LOG.debug("Successfully downloaded data file {} from shared file system", dataFilePath);
segmentFile = FileSystemUtils.createFileInDir(tempDirectory);
LOG.debug("Creating segment in " + segmentFile.getAbsolutePath());
// Creates a segment with name `segmentName` in `segmentFile`
generateSegment(segmentData, segmentFile, true);
// Uploads the recently created segment to the Pinot controller
uploadSegment(segmentFile);
// Commit successful
return true;
} catch (IOException e) {
LOG.error("Error while committing segment data stored on shared filesystem.", e);
// Commit failed
return false;
} finally {
// Finally cleanup all files created on the local filesystem
if (segmentData != null) {
segmentData.delete();
}
if (segmentFile != null) {
segmentFile.delete();
}
}
}