in indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java [646:878]
protected void reduce(BytesWritable key, Iterable<BytesWritable> values, final Context context)
throws IOException, InterruptedException
{
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
ListeningExecutorService persistExecutor = null;
List<ListenableFuture<?>> persistFutures = new ArrayList<>();
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
null,
null
);
try {
File baseFlushFile = FileUtils.createTempDir("base-flush");
Set<File> toMerge = new TreeSet<>();
int indexCount = 0;
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = new LinkedHashSet<>();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int numBackgroundPersistThreads = config.getSchema().getTuningConfig().getNumBackgroundPersistThreads();
if (numBackgroundPersistThreads > 0) {
final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
numBackgroundPersistThreads,
numBackgroundPersistThreads,
0L,
TimeUnit.MILLISECONDS,
queue,
Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Got Interrupted while adding to the Queue", e);
}
}
}
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = Execs.directExecutor();
}
for (final BytesWritable bw : values) {
context.progress();
final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(typeHelperMap, bw.getBytes(), aggregators));
int numRows = index.add(inputRow).getRowCount();
++lineCount;
if (!index.canAppendRow()) {
allDimensionNames.addAll(index.getDimensionNames(false));
log.info(index.getOutOfRowsReason());
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
final File file = new File(baseFlushFile, StringUtils.format("index%,05d", indexCount));
toMerge.add(file);
context.progress();
final IncrementalIndex persistIndex = index;
persistFutures.add(
persistExecutor.submit(
new ThreadRenamingRunnable(StringUtils.format("%s-persist", file.getName()))
{
@Override
public void doRun()
{
try {
persist(persistIndex, interval, file, progressIndicator);
}
catch (Exception e) {
log.error(e, "persist index error");
throw new RuntimeException(e);
}
finally {
// close this index
persistIndex.close();
}
}
}
)
);
index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
index.getDimensionOrder(),
persistIndex.getColumnFormats()
);
startTime = System.currentTimeMillis();
++indexCount;
}
}
allDimensionNames.addAll(index.getDimensionNames(false));
log.info("%,d lines completed.", lineCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
persist(index, interval, mergedBase, progressIndicator);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
persist(index, interval, finalFile, progressIndicator);
toMerge.add(finalFile);
}
Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS);
persistExecutor.shutdown();
for (File file : toMerge) {
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
log.info("starting merge of intermediate persisted segments.");
long mergeStartTime = System.currentTimeMillis();
mergedBase = mergeQueryableIndex(indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator);
log.info(
"finished merge of intermediate persisted segments. time taken [%d] ms.",
(System.currentTimeMillis() - mergeStartTime)
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
.getFileSystem(context.getConfiguration());
// ShardSpec used for partitioning within this Hadoop job.
final ShardSpec shardSpecForPartitioning = config.getShardSpec(bucket).getActualSpec();
// ShardSpec to be published.
final ShardSpec shardSpecForPublishing;
if (config.isForceExtendableShardSpecs()) {
shardSpecForPublishing = new NumberedShardSpec(
shardSpecForPartitioning.getPartitionNum(),
config.getShardSpecCount(bucket)
);
} else {
shardSpecForPublishing = shardSpecForPartitioning;
}
final DataSegment segmentTemplate = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
shardSpecForPublishing,
-1,
0
);
final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
segmentTemplate,
context.getConfiguration(),
context,
mergedBase,
JobHelper.makeFileNamePath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
JobHelper.INDEX_ZIP,
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
),
JobHelper.makeTmpPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
context.getTaskAttemptID(),
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
),
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
);
Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
descriptorPath = JobHelper.prependFSIfNullScheme(
FileSystem.get(
descriptorPath.toUri(),
context.getConfiguration()
), descriptorPath
);
log.info("Writing descriptor to path[%s]", descriptorPath);
JobHelper.writeSegmentDescriptor(
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
segmentAndIndexZipFilePath,
descriptorPath,
context
);
for (File file : toMerge) {
FileUtils.deleteDirectory(file);
}
}
catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
finally {
index.close();
if (persistExecutor != null) {
persistExecutor.shutdownNow();
}
}
}