in indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java [697:933]
protected void innerReduce(Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable)
throws IOException
{
final ByteBuffer groupKey = ByteBuffer.wrap(keyBytes.getGroupKey());
groupKey.position(4); // Skip partition
final DateTime bucket = DateTimes.utc(groupKey.getLong());
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
log.info(
"Determining partitions for interval: %s",
config.getGranularitySpec().bucketInterval(bucket).orNull()
);
// First DVC should be the total row count indicator
final DimValueCount firstDvc = iterator.next();
final long totalRows = firstDvc.numRows;
if (!firstDvc.dims.isEmpty() || firstDvc.values.size() != 0) {
throw new IllegalStateException("Expected total row indicator on first k/v pair");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
StringTuple currentDimPartitionStart = null;
boolean currentDimSkip = false;
// We'll store possible partitions in here
final Map<List<String>, DimPartitions> dimPartitionss = new HashMap<>();
final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
final DeterminePartitionsJobSampler sampler = createSampler(config);
while (iterator.hasNext()) {
final DimValueCount dvc = iterator.next();
if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
// Starting a new dimension! Exciting!
currentDimPartitions = new DimPartitions(dvc.dims);
currentDimPartition = new DimPartition();
currentDimPartitionStart = null;
currentDimSkip = false;
}
// Respect poisoning
if (!currentDimSkip && dvc.numRows < 0) {
log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
currentDimSkip = true;
}
if (currentDimSkip) {
continue;
}
// See if we need to cut a new partition ending immediately before this dimension value
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > sampler.getSampledTargetPartitionSize()) {
final ShardSpec shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
currentDimPartitionStart,
dvc.values,
currentDimPartitions.partitions.size(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
// completeness. See SingleDimensionShardSpec.createChunk().
SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
log.info(
"Adding possible shard with %,d rows and %,d unique values: %s",
currentDimPartition.rows,
currentDimPartition.cardinality,
shardSpec
);
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
currentDimPartition = new DimPartition();
currentDimPartitionStart = dvc.values;
}
// Update counters
currentDimPartition.cardinality++;
currentDimPartition.rows += dvc.numRows;
if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
// Finalize the current dimension
if (currentDimPartition.rows > 0) {
// One more shard to go
final ShardSpec shardSpec;
if (currentDimPartition.rows < sampler.getSampledTargetPartitionSize() * SHARD_COMBINE_THRESHOLD &&
!currentDimPartitions.partitions.isEmpty()) {
// Combine with previous shard if it exists and the current shard is small enough
final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
currentDimPartitions.partitions.size() - 1
);
final DimensionRangeShardSpec previousShardSpec = (DimensionRangeShardSpec) previousDimPartition.shardSpec;
shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
previousShardSpec.getStartTuple(),
null,
previousShardSpec.getPartitionNum(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
// completeness. See SingleDimensionShardSpec.createChunk().
SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
log.info("Removing possible shard: %s", previousShardSpec);
currentDimPartition.rows += previousDimPartition.rows;
currentDimPartition.cardinality += previousDimPartition.cardinality;
} else {
// Create new shard
shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
currentDimPartitionStart,
null,
currentDimPartitions.partitions.size(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
// completeness. See SingleDimensionShardSpec.createChunk().
SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
}
log.info(
"Adding possible shard with %,d rows and %,d unique values: %s",
currentDimPartition.rows,
currentDimPartition.cardinality,
shardSpec
);
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
}
log.info(
"Completed dimension[%s]: %,d possible shards with %,d unique values",
currentDimPartitions.dims,
currentDimPartitions.partitions.size(),
currentDimPartitions.getCardinality()
);
// Add ourselves to the partitions map
dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
}
}
// Choose best dimension
if (dimPartitionss.isEmpty()) {
throw new ISE("No suitable partitioning dimension found!");
}
int maxCardinality = Integer.MIN_VALUE;
long minDistance = Long.MAX_VALUE;
DimPartitions minDistancePartitions = null;
DimPartitions maxCardinalityPartitions = null;
for (final DimPartitions dimPartitions : dimPartitionss.values()) {
if (dimPartitions.getRows() != totalRows) {
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dims,
dimPartitions.getRows(),
totalRows
);
continue;
}
// Make sure none of these shards are oversized
boolean oversized = false;
for (final DimPartition partition : dimPartitions.partitions) {
if (partition.rows > sampler.getSampledMaxRowsPerSegment()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
oversized = true;
}
}
if (oversized) {
continue;
}
final int cardinality = dimPartitions.getCardinality();
final long distance = dimPartitions.getDistanceSquaredFromTarget(sampler.getSampledTargetPartitionSize());
if (cardinality > maxCardinality) {
maxCardinality = cardinality;
maxCardinalityPartitions = dimPartitions;
}
if (distance < minDistance) {
minDistance = distance;
minDistancePartitions = dimPartitions;
}
}
if (maxCardinalityPartitions == null) {
throw new ISE("No suitable partitioning dimension found!");
}
final OutputStream out = Utils.makePathAndOutputStream(
context,
config.makeSegmentPartitionInfoPath(config.getGranularitySpec().bucketInterval(bucket).get()),
config.isOverwriteFiles()
);
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD
? maxCardinalityPartitions
: minDistancePartitions;
final List<ShardSpec> chosenShardSpecs = Lists.transform(
chosenPartitions.partitions,
dimPartition -> dimPartition.shardSpec
);
log.info("Chosen partitions:");
for (ShardSpec shardSpec : chosenShardSpecs) {
log.info(" %s", HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(shardSpec));
}
try {
HadoopDruidIndexerConfig.JSON_MAPPER
.writerFor(
new TypeReference<List<ShardSpec>>()
{
}
)
.writeValue(out, chosenShardSpecs);
}
finally {
Closeables.close(out, false);
}
}