in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [87:111]
public PinotSinkCommitter(String pinotControllerHost, String pinotControllerPort,
String tableName, SegmentNameGenerator segmentNameGenerator,
String tempDirPrefix, FileSystemAdapter fsAdapter,
String timeColumnName, TimeUnit segmentTimeUnit,
int numCommitThreads) throws IOException {
this.pinotControllerHost = checkNotNull(pinotControllerHost);
this.pinotControllerPort = checkNotNull(pinotControllerPort);
this.tableName = checkNotNull(tableName);
this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
this.fsAdapter = checkNotNull(fsAdapter);
this.timeColumnName = checkNotNull(timeColumnName);
this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
this.pinotControllerClient = new PinotControllerClient(pinotControllerHost, pinotControllerPort);
// Create directory that temporary files will be stored in
this.tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile();
// Retrieve the Pinot table schema and the Pinot table config from the Pinot controller
this.tableSchema = pinotControllerClient.getSchema(tableName);
this.tableConfig = pinotControllerClient.getTableConfig(tableName);
// We use a thread pool in order to parallelize the segment creation and segment upload
checkArgument(numCommitThreads > 0);
this.pool = Executors.newFixedThreadPool(numCommitThreads);
}