in processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java [131:346]
private File makeIndexFiles(
final List<IndexableAdapter> adapters,
final @Nullable AggregatorFactory[] metricAggs,
final File outDir,
final ProgressIndicator progress,
final List<String> mergedDimensionsWithTime, // has both explicit and implicit dimensions, as well as __time
final DimensionsSpecInspector dimensionsSpecInspector,
final List<String> mergedMetrics,
final Function<List<TransformableRowIterator>, TimeAndDimsIterator> rowMergerFn,
final IndexSpec indexSpec,
final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
{
progress.start();
progress.progress();
List<Metadata> metadataList = Lists.transform(adapters, IndexableAdapter::getMetadata);
// Merged dimensions without __time.
List<String> mergedDimensions =
mergedDimensionsWithTime.stream()
.filter(dim -> !ColumnHolder.TIME_COLUMN_NAME.equals(dim))
.collect(Collectors.toList());
Metadata segmentMetadata;
if (metricAggs != null) {
AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length];
for (int i = 0; i < metricAggs.length; i++) {
combiningMetricAggs[i] = metricAggs[i].getCombiningFactory();
}
segmentMetadata = Metadata.merge(
metadataList,
combiningMetricAggs
);
} else {
segmentMetadata = Metadata.merge(
metadataList,
null
);
}
if (segmentMetadata != null
&& segmentMetadata.getOrdering() != null
&& segmentMetadata.getOrdering()
.stream()
.noneMatch(orderBy -> ColumnHolder.TIME_COLUMN_NAME.equals(orderBy.getColumnName()))) {
throw DruidException.defensive(
"sortOrder[%s] must include[%s]",
segmentMetadata.getOrdering(),
ColumnHolder.TIME_COLUMN_NAME
);
}
Closer closer = Closer.create();
try {
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
FileUtils.mkdirp(outDir);
SegmentWriteOutMediumFactory omf = segmentWriteOutMediumFactory != null ? segmentWriteOutMediumFactory
: defaultSegmentWriteOutMediumFactory;
log.debug("Using SegmentWriteOutMediumFactory[%s]", omf.getClass().getSimpleName());
SegmentWriteOutMedium segmentWriteOutMedium = omf.makeSegmentWriteOutMedium(outDir);
closer.register(segmentWriteOutMedium);
long startTime = System.currentTimeMillis();
Files.asByteSink(new File(outDir, "version.bin")).write(Ints.toByteArray(IndexIO.V9_VERSION));
log.debug("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime);
progress.progress();
startTime = System.currentTimeMillis();
try (FileOutputStream fos = new FileOutputStream(new File(outDir, "factory.json"))) {
SegmentizerFactory customSegmentLoader = indexSpec.getSegmentLoader();
if (customSegmentLoader != null) {
mapper.writeValue(fos, customSegmentLoader);
} else {
mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO));
}
}
log.debug("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime);
progress.progress();
final Map<String, ColumnFormat> metricFormats = new TreeMap<>(Comparators.naturalNullsFirst());
final List<ColumnFormat> dimFormats = Lists.newArrayListWithCapacity(mergedDimensions.size());
mergeFormat(adapters, mergedDimensions, metricFormats, dimFormats);
final Map<String, DimensionHandler> handlers = makeDimensionHandlers(mergedDimensions, dimFormats);
final Map<String, DimensionMergerV9> mergersMap = Maps.newHashMapWithExpectedSize(mergedDimensions.size());
final List<DimensionMergerV9> mergers = new ArrayList<>();
for (int i = 0; i < mergedDimensions.size(); i++) {
DimensionHandler handler = handlers.get(mergedDimensions.get(i));
DimensionMergerV9 merger = handler.makeMerger(
mergedDimensions.get(i),
indexSpec,
segmentWriteOutMedium,
dimFormats.get(i).toColumnCapabilities(),
progress,
outDir,
closer
);
mergers.add(merger);
mergersMap.put(mergedDimensions.get(i), merger);
}
if (segmentMetadata != null && segmentMetadata.getProjections() != null) {
for (AggregateProjectionMetadata projectionMetadata : segmentMetadata.getProjections()) {
for (String dimension : projectionMetadata.getSchema().getGroupingColumns()) {
DimensionMergerV9 merger = mergersMap.get(dimension);
if (merger != null) {
merger.markAsParent();
}
}
}
}
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
writeDimValuesAndSetupDimConversion(adapters, progress, mergedDimensions, mergers);
log.debug("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);
/************* Walk through data sets, merge them, and write merged columns *************/
progress.progress();
final TimeAndDimsIterator timeAndDimsIterator = makeMergedTimeAndDimsIterator(
adapters,
mergedDimensionsWithTime,
mergedMetrics,
rowMergerFn,
handlers,
mergers
);
closer.register(timeAndDimsIterator);
final GenericColumnSerializer timeWriter = setupTimeWriter(segmentWriteOutMedium, indexSpec);
final ArrayList<GenericColumnSerializer> metricWriters =
setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, metricFormats, indexSpec);
IndexMergeResult indexMergeResult = mergeIndexesAndWriteColumns(
adapters,
progress,
timeAndDimsIterator,
timeWriter,
metricWriters,
mergers
);
/************ Create Inverted Indexes and Finalize Build Columns *************/
final String section = "build inverted index and columns";
progress.startSection(section);
makeTimeColumn(v9Smoosher, progress, timeWriter, indexSpec);
makeMetricsColumns(
v9Smoosher,
progress,
mergedMetrics,
metricFormats,
metricWriters,
indexSpec
);
for (int i = 0; i < mergedDimensions.size(); i++) {
DimensionMergerV9 merger = mergers.get(i);
merger.writeIndexes(indexMergeResult.rowNumConversions);
if (!merger.hasOnlyNulls()) {
ColumnDescriptor columnDesc = merger.makeColumnDescriptor();
makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc);
} else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
// shouldStore AND hasOnlyNulls
ColumnDescriptor columnDesc = ColumnDescriptor
.builder()
.setValueType(dimFormats.get(i).getLogicalType().getType())
.addSerde(new NullColumnPartSerde(indexMergeResult.rowCount, indexSpec.getBitmapSerdeFactory()))
.build();
makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc);
}
}
progress.stopSection(section);
if (segmentMetadata != null && !CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) {
segmentMetadata = makeProjections(
v9Smoosher,
segmentMetadata.getProjections(),
adapters,
indexSpec,
segmentWriteOutMedium,
progress,
outDir,
closer,
mergersMap,
segmentMetadata
);
}
/************* Make index.drd & metadata.drd files **************/
progress.progress();
makeIndexBinary(
v9Smoosher,
adapters,
outDir,
mergedDimensions,
mergedMetrics,
progress,
indexSpec,
mergers,
dimensionsSpecInspector
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
v9Smoosher.close();
progress.stop();
return outDir;
}
catch (Throwable t) {
throw closer.rethrow(t);
}
finally {
closer.close();
}
}