private void generateSegment()

in flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java [402:434]


        private void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) {
            SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema);
            segmentGeneratorConfig.setSegmentName(segmentName);
            segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
            segmentGeneratorConfig.setTimeColumnName(timeColumnName);
            segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
            segmentGeneratorConfig.setFormat(FileFormat.JSON);
            segmentGeneratorConfig.setOutDir(outDir.getPath());
            segmentGeneratorConfig.setTableName(tableConfig.getTableName());

            try {
                SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
                driver.init(segmentGeneratorConfig);
                driver.build();
                File indexDir = new File(outDir, segmentName);
                LOG.debug("Successfully created segment: {} in directory: {}", segmentName, indexDir);
                if (_postCreationVerification) {
                    LOG.debug("Verifying the segment by loading it");
                    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
                    LOG.debug("Successfully loaded segment: {} of size: {} bytes", segmentName,
                            segment.getSegmentSizeBytes());
                    segment.destroy();
                }
            }
            // SegmentIndexCreationDriverImpl throws generic Exceptions during init and build
            // ImmutableSegmentLoader throws generic Exception during load
            catch (Exception e) {
                String message = String.format("Error while generating segment from file %s", dataFile.getAbsolutePath());
                LOG.error(message, e);
                throw new RuntimeException(message);
            }
            LOG.debug("Successfully created 1 segment from data file: {}", dataFile);
        }