in processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java [361:563]
private Metadata makeProjections(
final FileSmoosher smoosher,
final List<AggregateProjectionMetadata> projections,
final List<IndexableAdapter> adapters,
final IndexSpec indexSpec,
final SegmentWriteOutMedium segmentWriteOutMedium,
final ProgressIndicator progress,
final File segmentBaseDir,
final Closer closer,
final Map<String, DimensionMergerV9> parentMergers,
final Metadata segmentMetadata
) throws IOException
{
final List<AggregateProjectionMetadata> projectionMetadata = Lists.newArrayListWithCapacity(projections.size());
for (AggregateProjectionMetadata spec : projections) {
final List<IndexableAdapter> projectionAdapters = Lists.newArrayListWithCapacity(adapters.size());
final AggregateProjectionMetadata.Schema projectionSchema = spec.getSchema();
for (IndexableAdapter adapter : adapters) {
projectionAdapters.add(adapter.getProjectionAdapter(projectionSchema.getName()));
}
// we can use the first adapter to get the dimensions and metrics because the projection schema should be
// identical across all segments. This is validated by segment metadata merging
final List<String> dimensions = projectionAdapters.get(0).getDimensionNames(false);
final List<String> metrics = Arrays.stream(projectionSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toList());
final List<DimensionMergerV9> mergers = new ArrayList<>();
final Map<String, ColumnFormat> columnFormats = Maps.newLinkedHashMapWithExpectedSize(dimensions.size() + metrics.size());
for (String dimension : dimensions) {
final ColumnFormat dimensionFormat = projectionAdapters.get(0).getFormat(dimension);
columnFormats.put(dimension, dimensionFormat);
DimensionHandler handler = dimensionFormat.getColumnHandler(dimension);
DimensionMergerV9 merger = handler.makeMerger(
Projections.getProjectionSmooshV9FileName(spec, dimension),
indexSpec,
segmentWriteOutMedium,
dimensionFormat.toColumnCapabilities(),
progress,
segmentBaseDir,
closer
);
if (parentMergers.containsKey(dimension)) {
merger.attachParent(parentMergers.get(dimension), projectionAdapters);
} else {
merger.writeMergedValueDictionary(projectionAdapters);
}
mergers.add(merger);
}
for (String metric : metrics) {
columnFormats.put(metric, projectionAdapters.get(0).getFormat(metric));
}
final GenericColumnSerializer timeWriter;
if (projectionSchema.getTimeColumnName() != null) {
timeWriter = setupTimeWriter(segmentWriteOutMedium, indexSpec);
} else {
timeWriter = null;
}
final ArrayList<GenericColumnSerializer> metricWriters =
setupMetricsWriters(
segmentWriteOutMedium,
metrics,
columnFormats,
indexSpec,
Projections.getProjectionSmooshV9Prefix(spec)
);
Function<List<TransformableRowIterator>, TimeAndDimsIterator> rowMergerFn =
rowIterators -> new RowCombiningTimeAndDimsIterator(rowIterators, projectionSchema.getAggregators(), metrics);
List<TransformableRowIterator> perIndexRowIterators = Lists.newArrayListWithCapacity(projectionAdapters.size());
for (int i = 0; i < projectionAdapters.size(); ++i) {
final IndexableAdapter adapter = projectionAdapters.get(i);
TransformableRowIterator target = adapter.getRows();
perIndexRowIterators.add(IndexMerger.toMergedIndexRowIterator(target, i, mergers));
}
final TimeAndDimsIterator timeAndDimsIterator = rowMergerFn.apply(perIndexRowIterators);
closer.register(timeAndDimsIterator);
int rowCount = 0;
List<IntBuffer> rowNumConversions = new ArrayList<>(projectionAdapters.size());
for (IndexableAdapter adapter : projectionAdapters) {
int[] arr = new int[adapter.getNumRows()];
Arrays.fill(arr, INVALID_ROW);
rowNumConversions.add(IntBuffer.wrap(arr));
}
final String section = "walk through and merge projection[" + projectionSchema.getName() + "] rows";
progress.startSection(section);
long startTime = System.currentTimeMillis();
long time = startTime;
while (timeAndDimsIterator.moveToNext()) {
progress.progress();
TimeAndDimsPointer timeAndDims = timeAndDimsIterator.getPointer();
if (timeWriter != null) {
timeWriter.serialize(timeAndDims.timestampSelector);
}
for (int metricIndex = 0; metricIndex < timeAndDims.getNumMetrics(); metricIndex++) {
metricWriters.get(metricIndex).serialize(timeAndDims.getMetricSelector(metricIndex));
}
for (int dimIndex = 0; dimIndex < timeAndDims.getNumDimensions(); dimIndex++) {
DimensionMergerV9 merger = mergers.get(dimIndex);
if (merger.hasOnlyNulls()) {
continue;
}
merger.processMergedRow(timeAndDims.getDimensionSelector(dimIndex));
}
RowCombiningTimeAndDimsIterator comprisedRows = (RowCombiningTimeAndDimsIterator) timeAndDimsIterator;
for (int originalIteratorIndex = comprisedRows.nextCurrentlyCombinedOriginalIteratorIndex(0);
originalIteratorIndex >= 0;
originalIteratorIndex =
comprisedRows.nextCurrentlyCombinedOriginalIteratorIndex(originalIteratorIndex + 1)) {
IntBuffer conversionBuffer = rowNumConversions.get(originalIteratorIndex);
int minRowNum = comprisedRows.getMinCurrentlyCombinedRowNumByOriginalIteratorIndex(originalIteratorIndex);
int maxRowNum = comprisedRows.getMaxCurrentlyCombinedRowNumByOriginalIteratorIndex(originalIteratorIndex);
for (int rowNum = minRowNum; rowNum <= maxRowNum; rowNum++) {
while (conversionBuffer.position() < rowNum) {
conversionBuffer.put(INVALID_ROW);
}
conversionBuffer.put(rowCount);
}
}
if ((++rowCount % 500000) == 0) {
log.debug(
"walked 500,000/%d rows of projection[%s] in %,d millis.",
rowCount,
projectionSchema.getName(),
System.currentTimeMillis() - time
);
time = System.currentTimeMillis();
}
}
for (IntBuffer rowNumConversion : rowNumConversions) {
rowNumConversion.rewind();
}
log.debug(
"completed walk through of %,d rows of projection[%s] in %,d millis.",
rowCount,
projectionSchema.getName(),
System.currentTimeMillis() - startTime
);
progress.stopSection(section);
final String section2 = "build projection[" + projectionSchema.getName() + "] inverted index and columns";
progress.startSection(section2);
if (projectionSchema.getTimeColumnName() != null) {
makeTimeColumn(
smoosher,
progress,
timeWriter,
indexSpec,
Projections.getProjectionSmooshV9FileName(spec, projectionSchema.getTimeColumnName())
);
}
makeMetricsColumns(
smoosher,
progress,
metrics,
columnFormats,
metricWriters,
indexSpec,
Projections.getProjectionSmooshV9Prefix(spec)
);
for (int i = 0; i < dimensions.size(); i++) {
final String dimension = dimensions.get(i);
final DimensionMergerV9 merger = mergers.get(i);
merger.writeIndexes(rowNumConversions);
final ColumnDescriptor columnDesc;
if (merger.hasOnlyNulls()) {
// synthetic null column descriptor if merger participates in generic null column stuff
// always write a null column if hasOnlyNulls is true. This is correct regardless of how storeEmptyColumns is
// set because:
// - if storeEmptyColumns is true, the base table also does this,
// - if storeEmptyColumns is false, the base table omits the column from the dimensions list as if it does not
// exist, however for projections the dimensions list is always populated by the projection schema, so a
// column always needs to exist to not run into null pointer exceptions.
columnDesc = ColumnDescriptor
.builder()
.setValueType(columnFormats.get(dimension).getLogicalType().getType())
.addSerde(new NullColumnPartSerde(rowCount, indexSpec.getBitmapSerdeFactory()))
.build();
} else {
// use merger descriptor, merger either has values or handles it own null column storage details
columnDesc = merger.makeColumnDescriptor();
}
makeColumn(smoosher, Projections.getProjectionSmooshV9FileName(spec, dimension), columnDesc);
}
progress.stopSection(section2);
projectionMetadata.add(new AggregateProjectionMetadata(projectionSchema, rowCount));
}
return segmentMetadata.withProjections(projectionMetadata);
}