in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [402:434]
private void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) {
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema);
segmentGeneratorConfig.setSegmentName(segmentName);
segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
segmentGeneratorConfig.setTimeColumnName(timeColumnName);
segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
segmentGeneratorConfig.setFormat(FileFormat.JSON);
segmentGeneratorConfig.setOutDir(outDir.getPath());
segmentGeneratorConfig.setTableName(tableConfig.getTableName());
try {
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig);
driver.build();
File indexDir = new File(outDir, segmentName);
LOG.debug("Successfully created segment: {} in directory: {}", segmentName, indexDir);
if (_postCreationVerification) {
LOG.debug("Verifying the segment by loading it");
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
LOG.debug("Successfully loaded segment: {} of size: {} bytes", segmentName,
segment.getSegmentSizeBytes());
segment.destroy();
}
}
// SegmentIndexCreationDriverImpl throws generic Exceptions during init and build
// ImmutableSegmentLoader throws generic Exception during load
catch (Exception e) {
String message = String.format("Error while generating segment from file %s", dataFile.getAbsolutePath());
LOG.error(message, e);
throw new RuntimeException(message);
}
LOG.debug("Successfully created 1 segment from data file: {}", dataFile);
}