in processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java [267:451]
public static SegmentAnalysis mergeAnalyses(
Set<String> dataSources,
SegmentAnalysis arg1,
SegmentAnalysis arg2,
AggregatorMergeStrategy aggregatorMergeStrategy
)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
// This is a defensive check since SegementMetadata query instantiation guarantees this
if (CollectionUtils.isNullOrEmpty(dataSources)) {
throw InvalidInput.exception("SegementMetadata queries require at least one datasource.");
}
SegmentId mergedSegmentId = null;
// Union datasources can have multiple datasources. So we iterate through all the datasources to parse the segment id.
for (String dataSource : dataSources) {
final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());
// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (id1 != null && id2 != null) {
if (id2.getIntervalEnd().isAfter(id1.getIntervalEnd()) ||
(id2.getIntervalEnd().isEqual(id1.getIntervalEnd()) && id2.getPartitionNum() > id1.getPartitionNum())) {
mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum());
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
} else {
mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum());
}
break;
}
}
List<Interval> newIntervals = null;
if (arg1.getIntervals() != null) {
newIntervals = new ArrayList<>(arg1.getIntervals());
}
if (arg2.getIntervals() != null) {
if (newIntervals == null) {
newIntervals = new ArrayList<>();
}
newIntervals.addAll(arg2.getIntervals());
}
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
final LinkedHashMap<String, ColumnAnalysis> columns = new LinkedHashMap<>();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
final Map<String, AggregatorFactory> aggregators = new HashMap<>();
if (AggregatorMergeStrategy.LENIENT == aggregatorMergeStrategy) {
// Merge each aggregator individually, ignoring nulls
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
final String aggregatorName = entry.getKey();
final AggregatorFactory aggregator = entry.getValue();
final boolean isMergedYet = aggregators.containsKey(aggregatorName);
AggregatorFactory merged;
if (!isMergedYet) {
merged = aggregator;
} else {
merged = aggregators.get(aggregatorName);
if (merged != null && aggregator != null) {
try {
merged = merged.getMergingFactory(aggregator);
}
catch (AggregatorFactoryNotMergeableException e) {
merged = null;
}
} else {
merged = null;
}
}
aggregators.put(aggregatorName, merged);
}
}
}
} else if (AggregatorMergeStrategy.STRICT == aggregatorMergeStrategy) {
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
? arg1.getAggregators()
.values()
.toArray(new AggregatorFactory[0])
: null;
final AggregatorFactory[] aggs2 = arg2.getAggregators() != null
? arg2.getAggregators()
.values()
.toArray(new AggregatorFactory[0])
: null;
final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2));
if (merged != null) {
for (AggregatorFactory aggregator : merged) {
aggregators.put(aggregator.getName(), aggregator);
}
}
} else if (AggregatorMergeStrategy.EARLIEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2. So for earliest strategy, the iteration order should be arg2 and arg1.
for (SegmentAnalysis analysis : ImmutableList.of(arg2, arg1)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
aggregators.putIfAbsent(entry.getKey(), entry.getValue());
}
}
}
} else if (AggregatorMergeStrategy.LATEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2. So for latest strategy, the iteration order should be arg1 and arg2.
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
aggregators.putIfAbsent(entry.getKey(), entry.getValue());
}
}
}
} else {
throw DruidException.defensive("[%s] merge strategy is not implemented.", aggregatorMergeStrategy);
}
final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
Lists.newArrayList(
arg1.getTimestampSpec(),
arg2.getTimestampSpec()
)
);
final Granularity queryGranularity = Granularity.mergeGranularities(
Lists.newArrayList(
arg1.getQueryGranularity(),
arg2.getQueryGranularity()
)
);
final String mergedId;
if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
mergedId = arg1.getId();
} else {
mergedId = mergedSegmentId == null ? "merged" : mergedSegmentId.toString();
}
final Boolean rollup;
if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
rollup = arg1.isRollup();
} else {
rollup = null;
}
return new SegmentAnalysis(
mergedId,
newIntervals,
columns,
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity,
rollup
);
}