public static SegmentAnalysis mergeAnalyses()

in processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java [267:451]


  public static SegmentAnalysis mergeAnalyses(
      Set<String> dataSources,
      SegmentAnalysis arg1,
      SegmentAnalysis arg2,
      AggregatorMergeStrategy aggregatorMergeStrategy
  )
  {
    if (arg1 == null) {
      return arg2;
    }

    if (arg2 == null) {
      return arg1;
    }

    // This is a defensive check since SegementMetadata query instantiation guarantees this
    if (CollectionUtils.isNullOrEmpty(dataSources)) {
      throw InvalidInput.exception("SegementMetadata queries require at least one datasource.");
    }

    SegmentId mergedSegmentId = null;

    // Union datasources can have multiple datasources. So we iterate through all the datasources to parse the segment id.
    for (String dataSource : dataSources) {
      final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
      final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());

      // Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
      // We're preserving it so callers can see columns in their natural order.
      if (id1 != null && id2 != null) {
        if (id2.getIntervalEnd().isAfter(id1.getIntervalEnd()) ||
            (id2.getIntervalEnd().isEqual(id1.getIntervalEnd()) && id2.getPartitionNum() > id1.getPartitionNum())) {
          mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum());
          final SegmentAnalysis tmp = arg1;
          arg1 = arg2;
          arg2 = tmp;
        } else {
          mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum());
        }
        break;
      }
    }

    List<Interval> newIntervals = null;
    if (arg1.getIntervals() != null) {
      newIntervals = new ArrayList<>(arg1.getIntervals());
    }
    if (arg2.getIntervals() != null) {
      if (newIntervals == null) {
        newIntervals = new ArrayList<>();
      }
      newIntervals.addAll(arg2.getIntervals());
    }

    final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
    final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
    final LinkedHashMap<String, ColumnAnalysis> columns = new LinkedHashMap<>();

    Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
    for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
      final String columnName = entry.getKey();
      columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
      rightColumnNames.remove(columnName);
    }

    for (String columnName : rightColumnNames) {
      columns.put(columnName, rightColumns.get(columnName));
    }

    final Map<String, AggregatorFactory> aggregators = new HashMap<>();

    if (AggregatorMergeStrategy.LENIENT == aggregatorMergeStrategy) {
      // Merge each aggregator individually, ignoring nulls
      for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
        if (analysis.getAggregators() != null) {
          for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
            final String aggregatorName = entry.getKey();
            final AggregatorFactory aggregator = entry.getValue();
            final boolean isMergedYet = aggregators.containsKey(aggregatorName);
            AggregatorFactory merged;

            if (!isMergedYet) {
              merged = aggregator;
            } else {
              merged = aggregators.get(aggregatorName);

              if (merged != null && aggregator != null) {
                try {
                  merged = merged.getMergingFactory(aggregator);
                }
                catch (AggregatorFactoryNotMergeableException e) {
                  merged = null;
                }
              } else {
                merged = null;
              }
            }

            aggregators.put(aggregatorName, merged);
          }
        }
      }
    } else if (AggregatorMergeStrategy.STRICT == aggregatorMergeStrategy) {
      final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
                                        ? arg1.getAggregators()
                                              .values()
                                              .toArray(new AggregatorFactory[0])
                                        : null;
      final AggregatorFactory[] aggs2 = arg2.getAggregators() != null
                                        ? arg2.getAggregators()
                                              .values()
                                              .toArray(new AggregatorFactory[0])
                                        : null;
      final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2));
      if (merged != null) {
        for (AggregatorFactory aggregator : merged) {
          aggregators.put(aggregator.getName(), aggregator);
        }
      }
    } else if (AggregatorMergeStrategy.EARLIEST == aggregatorMergeStrategy) {
      // The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
      // followed by arg2. So for earliest strategy, the iteration order should be arg2 and arg1.
      for (SegmentAnalysis analysis : ImmutableList.of(arg2, arg1)) {
        if (analysis.getAggregators() != null) {
          for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
            aggregators.putIfAbsent(entry.getKey(), entry.getValue());
          }
        }
      }
    } else if (AggregatorMergeStrategy.LATEST == aggregatorMergeStrategy) {
      // The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
      // followed by arg2. So for latest strategy, the iteration order should be arg1 and arg2.
      for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
        if (analysis.getAggregators() != null) {
          for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
            aggregators.putIfAbsent(entry.getKey(), entry.getValue());
          }
        }
      }
    } else {
      throw DruidException.defensive("[%s] merge strategy is not implemented.", aggregatorMergeStrategy);
    }

    final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
        Lists.newArrayList(
            arg1.getTimestampSpec(),
            arg2.getTimestampSpec()
        )
    );

    final Granularity queryGranularity = Granularity.mergeGranularities(
        Lists.newArrayList(
            arg1.getQueryGranularity(),
            arg2.getQueryGranularity()
        )
    );

    final String mergedId;

    if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
      mergedId = arg1.getId();
    } else {
      mergedId = mergedSegmentId == null ? "merged" : mergedSegmentId.toString();
    }

    final Boolean rollup;

    if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
      rollup = arg1.isRollup();
    } else {
      rollup = null;
    }

    return new SegmentAnalysis(
        mergedId,
        newIntervals,
        columns,
        arg1.getSize() + arg2.getSize(),
        arg1.getNumRows() + arg2.getNumRows(),
        aggregators.isEmpty() ? null : aggregators,
        timestampSpec,
        queryGranularity,
        rollup
    );
  }