private Metadata makeProjections()

in processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java [361:563]


  private Metadata makeProjections(
      final FileSmoosher smoosher,
      final List<AggregateProjectionMetadata> projections,
      final List<IndexableAdapter> adapters,
      final IndexSpec indexSpec,
      final SegmentWriteOutMedium segmentWriteOutMedium,
      final ProgressIndicator progress,
      final File segmentBaseDir,
      final Closer closer,
      final Map<String, DimensionMergerV9> parentMergers,
      final Metadata segmentMetadata
  ) throws IOException
  {
    final List<AggregateProjectionMetadata> projectionMetadata = Lists.newArrayListWithCapacity(projections.size());
    for (AggregateProjectionMetadata spec : projections) {
      final List<IndexableAdapter> projectionAdapters = Lists.newArrayListWithCapacity(adapters.size());
      final AggregateProjectionMetadata.Schema projectionSchema = spec.getSchema();
      for (IndexableAdapter adapter : adapters) {
        projectionAdapters.add(adapter.getProjectionAdapter(projectionSchema.getName()));
      }
      // we can use the first adapter to get the dimensions and metrics because the projection schema should be
      // identical across all segments. This is validated by segment metadata merging
      final List<String> dimensions = projectionAdapters.get(0).getDimensionNames(false);
      final List<String> metrics = Arrays.stream(projectionSchema.getAggregators())
                                         .map(AggregatorFactory::getName)
                                         .collect(Collectors.toList());


      final List<DimensionMergerV9> mergers = new ArrayList<>();
      final Map<String, ColumnFormat> columnFormats = Maps.newLinkedHashMapWithExpectedSize(dimensions.size() + metrics.size());

      for (String dimension : dimensions) {
        final ColumnFormat dimensionFormat = projectionAdapters.get(0).getFormat(dimension);
        columnFormats.put(dimension, dimensionFormat);
        DimensionHandler handler = dimensionFormat.getColumnHandler(dimension);
        DimensionMergerV9 merger = handler.makeMerger(
            Projections.getProjectionSmooshV9FileName(spec, dimension),
            indexSpec,
            segmentWriteOutMedium,
            dimensionFormat.toColumnCapabilities(),
            progress,
            segmentBaseDir,
            closer
        );
        if (parentMergers.containsKey(dimension)) {
          merger.attachParent(parentMergers.get(dimension), projectionAdapters);
        } else {
          merger.writeMergedValueDictionary(projectionAdapters);
        }
        mergers.add(merger);
      }
      for (String metric : metrics) {
        columnFormats.put(metric, projectionAdapters.get(0).getFormat(metric));
      }

      final GenericColumnSerializer timeWriter;
      if (projectionSchema.getTimeColumnName() != null) {
        timeWriter = setupTimeWriter(segmentWriteOutMedium, indexSpec);
      } else {
        timeWriter = null;
      }
      final ArrayList<GenericColumnSerializer> metricWriters =
          setupMetricsWriters(
              segmentWriteOutMedium,
              metrics,
              columnFormats,
              indexSpec,
              Projections.getProjectionSmooshV9Prefix(spec)
          );

      Function<List<TransformableRowIterator>, TimeAndDimsIterator> rowMergerFn =
          rowIterators -> new RowCombiningTimeAndDimsIterator(rowIterators, projectionSchema.getAggregators(), metrics);

      List<TransformableRowIterator> perIndexRowIterators = Lists.newArrayListWithCapacity(projectionAdapters.size());
      for (int i = 0; i < projectionAdapters.size(); ++i) {
        final IndexableAdapter adapter = projectionAdapters.get(i);
        TransformableRowIterator target = adapter.getRows();
        perIndexRowIterators.add(IndexMerger.toMergedIndexRowIterator(target, i, mergers));
      }
      final TimeAndDimsIterator timeAndDimsIterator = rowMergerFn.apply(perIndexRowIterators);
      closer.register(timeAndDimsIterator);

      int rowCount = 0;
      List<IntBuffer> rowNumConversions = new ArrayList<>(projectionAdapters.size());
      for (IndexableAdapter adapter : projectionAdapters) {
        int[] arr = new int[adapter.getNumRows()];
        Arrays.fill(arr, INVALID_ROW);
        rowNumConversions.add(IntBuffer.wrap(arr));
      }

      final String section = "walk through and merge projection[" + projectionSchema.getName() + "] rows";
      progress.startSection(section);
      long startTime = System.currentTimeMillis();
      long time = startTime;
      while (timeAndDimsIterator.moveToNext()) {
        progress.progress();
        TimeAndDimsPointer timeAndDims = timeAndDimsIterator.getPointer();
        if (timeWriter != null) {
          timeWriter.serialize(timeAndDims.timestampSelector);
        }

        for (int metricIndex = 0; metricIndex < timeAndDims.getNumMetrics(); metricIndex++) {
          metricWriters.get(metricIndex).serialize(timeAndDims.getMetricSelector(metricIndex));
        }

        for (int dimIndex = 0; dimIndex < timeAndDims.getNumDimensions(); dimIndex++) {
          DimensionMergerV9 merger = mergers.get(dimIndex);
          if (merger.hasOnlyNulls()) {
            continue;
          }
          merger.processMergedRow(timeAndDims.getDimensionSelector(dimIndex));
        }

        RowCombiningTimeAndDimsIterator comprisedRows = (RowCombiningTimeAndDimsIterator) timeAndDimsIterator;

        for (int originalIteratorIndex = comprisedRows.nextCurrentlyCombinedOriginalIteratorIndex(0);
             originalIteratorIndex >= 0;
             originalIteratorIndex =
                 comprisedRows.nextCurrentlyCombinedOriginalIteratorIndex(originalIteratorIndex + 1)) {

          IntBuffer conversionBuffer = rowNumConversions.get(originalIteratorIndex);
          int minRowNum = comprisedRows.getMinCurrentlyCombinedRowNumByOriginalIteratorIndex(originalIteratorIndex);
          int maxRowNum = comprisedRows.getMaxCurrentlyCombinedRowNumByOriginalIteratorIndex(originalIteratorIndex);

          for (int rowNum = minRowNum; rowNum <= maxRowNum; rowNum++) {
            while (conversionBuffer.position() < rowNum) {
              conversionBuffer.put(INVALID_ROW);
            }
            conversionBuffer.put(rowCount);
          }
        }
        if ((++rowCount % 500000) == 0) {
          log.debug(
              "walked 500,000/%d rows of projection[%s] in %,d millis.",
              rowCount,
              projectionSchema.getName(),
              System.currentTimeMillis() - time
          );
          time = System.currentTimeMillis();
        }
      }
      for (IntBuffer rowNumConversion : rowNumConversions) {
        rowNumConversion.rewind();
      }
      log.debug(
          "completed walk through of %,d rows of projection[%s] in %,d millis.",
          rowCount,
          projectionSchema.getName(),
          System.currentTimeMillis() - startTime
      );
      progress.stopSection(section);

      final String section2 = "build projection[" + projectionSchema.getName() + "] inverted index and columns";
      progress.startSection(section2);
      if (projectionSchema.getTimeColumnName() != null) {
        makeTimeColumn(
            smoosher,
            progress,
            timeWriter,
            indexSpec,
            Projections.getProjectionSmooshV9FileName(spec, projectionSchema.getTimeColumnName())
        );
      }
      makeMetricsColumns(
          smoosher,
          progress,
          metrics,
          columnFormats,
          metricWriters,
          indexSpec,
          Projections.getProjectionSmooshV9Prefix(spec)
      );

      for (int i = 0; i < dimensions.size(); i++) {
        final String dimension = dimensions.get(i);
        final DimensionMergerV9 merger = mergers.get(i);
        merger.writeIndexes(rowNumConversions);
        final ColumnDescriptor columnDesc;
        if (merger.hasOnlyNulls()) {
          // synthetic null column descriptor if merger participates in generic null column stuff
          // always write a null column if hasOnlyNulls is true. This is correct regardless of how storeEmptyColumns is
          // set because:
          // - if storeEmptyColumns is true, the base table also does this,
          // - if storeEmptyColumns is false, the base table omits the column from the dimensions list as if it does not
          //   exist, however for projections the dimensions list is always populated by the projection schema, so a
          //   column always needs to exist to not run into null pointer exceptions.
          columnDesc = ColumnDescriptor
              .builder()
              .setValueType(columnFormats.get(dimension).getLogicalType().getType())
              .addSerde(new NullColumnPartSerde(rowCount, indexSpec.getBitmapSerdeFactory()))
              .build();
        } else {
          // use merger descriptor, merger either has values or handles it own null column storage details
          columnDesc = merger.makeColumnDescriptor();
        }
        makeColumn(smoosher, Projections.getProjectionSmooshV9FileName(spec, dimension), columnDesc);
      }

      progress.stopSection(section2);
      projectionMetadata.add(new AggregateProjectionMetadata(projectionSchema, rowCount));
    }
    return segmentMetadata.withProjections(projectionMetadata);
  }