in lucene/core/src/java/org/apache/lucene/index/IndexWriter.java [5156:5501]
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
testPoint("mergeMiddleStart");
merge.checkAborted();
Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
IOContext context = IOContext.merge(merge.getStoreMergeInfo());
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merging " + segString(merge.segments));
}
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
try {
merge.initMergeReaders(
sci -> {
final ReadersAndUpdates rld = getPooledInstance(sci, true);
rld.setIsMerging();
synchronized (this) {
return rld.getReaderForMerge(
context, mr -> deleter.incRef(mr.reader.getSegmentInfo().files()));
}
});
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();
Counter softDeleteCount = Counter.newCounter(false);
for (MergePolicy.MergeReader mergeReader : merge.getMergeReader()) {
SegmentReader reader = mergeReader.reader;
CodecReader wrappedReader = merge.wrapForMerge(reader);
validateMergeReader(wrappedReader);
if (softDeletesEnabled) {
if (reader != wrappedReader) { // if we don't have a wrapped reader we won't preserve any
// soft-deletes
Bits hardLiveDocs = mergeReader.hardLiveDocs;
// we only need to do this accounting if we have mixed deletes
if (hardLiveDocs != null) {
Bits wrappedLiveDocs = wrappedReader.getLiveDocs();
Counter hardDeleteCounter = Counter.newCounter(false);
countSoftDeletes(
wrappedReader, wrappedLiveDocs, hardLiveDocs, softDeleteCount, hardDeleteCounter);
int hardDeleteCount = Math.toIntExact(hardDeleteCounter.get());
// Wrap the wrapped reader again if we have excluded some hard-deleted docs
if (hardDeleteCount > 0) {
Bits liveDocs =
wrappedLiveDocs == null
? hardLiveDocs
: new Bits() {
@Override
public boolean get(int index) {
return hardLiveDocs.get(index) && wrappedLiveDocs.get(index);
}
@Override
public int length() {
return hardLiveDocs.length();
}
};
wrappedReader =
FilterCodecReader.wrapLiveDocs(
wrappedReader, liveDocs, wrappedReader.numDocs() - hardDeleteCount);
}
} else {
final int carryOverSoftDeletes =
reader.getSegmentInfo().getSoftDelCount() - wrappedReader.numDeletedDocs();
assert carryOverSoftDeletes >= 0 : "carry-over soft-deletes must be positive";
assert assertSoftDeletesCount(wrappedReader, carryOverSoftDeletes);
softDeleteCount.addAndGet(carryOverSoftDeletes);
}
}
}
mergeReaders.add(wrappedReader);
}
final Executor intraMergeExecutor = mergeScheduler.getIntraMergeExecutor(merge);
MergeState.DocMap[] reorderDocMaps = null;
// Don't reorder if an explicit sort is configured.
final boolean hasIndexSort = config.getIndexSort() != null;
// Don't reorder if blocks can't be identified using the parent field.
final boolean hasBlocksButNoParentField =
mergeReaders.stream().map(LeafReader::getMetaData).anyMatch(LeafMetaData::hasBlocks)
&& mergeReaders.stream()
.map(CodecReader::getFieldInfos)
.map(FieldInfos::getParentField)
.anyMatch(Objects::isNull);
if (hasIndexSort == false && hasBlocksButNoParentField == false) {
// Create a merged view of the input segments. This effectively does the merge.
CodecReader mergedView = SlowCompositeCodecReaderWrapper.wrap(mergeReaders);
Sorter.DocMap docMap = merge.reorder(mergedView, directory, intraMergeExecutor);
if (docMap != null) {
reorderDocMaps = new MergeState.DocMap[mergeReaders.size()];
int docBase = 0;
int i = 0;
for (CodecReader reader : mergeReaders) {
final int currentDocBase = docBase;
reorderDocMaps[i] =
docID -> {
Objects.checkIndex(docID, reader.maxDoc());
return docMap.oldToNew(currentDocBase + docID);
};
i++;
docBase += reader.maxDoc();
}
// This makes merging more expensive as it disables some bulk merging optimizations, so
// only do this if a non-null DocMap is returned.
mergeReaders =
Collections.singletonList(SortingCodecReader.wrap(mergedView, docMap, null));
}
}
final SegmentMerger merger =
new SegmentMerger(
mergeReaders,
merge.info.info,
infoStream,
dirWrapper,
globalFieldNumberMap,
context,
intraMergeExecutor);
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();
MergeState mergeState = merger.mergeState;
MergeState.DocMap[] docMaps;
if (reorderDocMaps == null) {
docMaps = mergeState.docMaps;
} else {
// Since the reader was reordered, we passed a merged view to MergeState and from its
// perspective there is a single input segment to the merge and the
// SlowCompositeCodecReaderWrapper is effectively doing the merge.
assert mergeState.docMaps.length == 1
: "Got " + mergeState.docMaps.length + " docMaps, but expected 1";
MergeState.DocMap compactionDocMap = mergeState.docMaps[0];
docMaps = new MergeState.DocMap[reorderDocMaps.length];
for (int i = 0; i < docMaps.length; ++i) {
MergeState.DocMap reorderDocMap = reorderDocMaps[i];
docMaps[i] = docID -> compactionDocMap.get(reorderDocMap.get(docID));
}
}
merge.mergeStartNS = System.nanoTime();
// This is where all the work happens:
if (merger.shouldMerge()) {
merger.merge();
}
assert mergeState.segmentInfo == merge.info.info;
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
Codec codec = config.getCodec();
if (infoStream.isEnabled("IW")) {
if (merger.shouldMerge()) {
String pauseInfo =
merge.getMergeProgress().getPauseTimes().entrySet().stream()
.filter((e) -> e.getValue() > 0)
.map(
(e) ->
String.format(
Locale.ROOT,
"%.1f sec %s",
e.getValue() / (double) TimeUnit.SECONDS.toNanos(1),
e.getKey().name().toLowerCase(Locale.ROOT)))
.collect(Collectors.joining(", "));
if (!pauseInfo.isEmpty()) {
pauseInfo = " (" + pauseInfo + ")";
}
long t1 = System.nanoTime();
double sec = (t1 - merge.mergeStartNS) / (double) TimeUnit.SECONDS.toNanos(1);
double segmentMB = (merge.info.sizeInBytes() / 1024. / 1024.);
infoStream.message(
"IW",
("merge codec=" + codec)
+ (" maxDoc=" + merge.info.info.maxDoc())
+ ("; merged segment has "
+ (mergeState.mergeFieldInfos.hasTermVectors() ? "vectors" : "no vectors"))
+ ("; " + (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms"))
+ ("; "
+ (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues"))
+ ("; " + (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox"))
+ ("; " + (mergeState.mergeFieldInfos.hasFreq() ? "freqs" : "no freqs"))
+ ("; " + (mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points"))
+ ("; "
+ String.format(
Locale.ROOT,
"%.1f sec%s to merge segment [%.2f MB, %.2f MB/sec]",
sec,
pauseInfo,
segmentMB,
segmentMB / sec)));
} else {
infoStream.message("IW", "skip merging fully deleted segments");
}
}
if (merger.shouldMerge() == false) {
// Merge would produce a 0-doc segment, so we do nothing except commit the merge to remove
// all the 0-doc segments that we "merged":
assert merge.info.info.maxDoc() == 0;
success = commitMerge(merge, docMaps);
return 0;
}
assert merge.info.info.maxDoc() > 0;
// Very important to do this before opening the reader
// because codec must know if prox was written for
// this segment:
boolean useCompoundFile;
synchronized (this) { // Guard segmentInfos
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.info, this);
}
if (useCompoundFile) {
success = false;
Collection<String> filesToRemove = merge.info.files();
// NOTE: Creation of the CFS file must be performed with the original
// directory rather than with the merging directory, so that it is not
// subject to merge throttling.
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory);
try {
createCompoundFile(
infoStream, trackingCFSDir, merge.info.info, context, this::deleteNewFiles);
success = true;
} catch (Throwable t) {
synchronized (this) {
if (merge.isAborted()) {
// This can happen if rollback is called while we were building
// our CFS -- fall through to logic below to remove the non-CFS
// merged files:
if (infoStream.isEnabled("IW")) {
infoStream.message(
"IW", "hit merge abort exception creating compound file during merge");
}
return 0;
} else {
handleMergeException(t, merge);
}
}
} finally {
if (success == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception creating compound file during merge");
}
// Safe: these files must exist
deleteNewFiles(merge.info.files());
}
}
// So that, if we hit exc in deleteNewFiles (next)
// or in commitMerge (later), we close the
// per-segment readers in the finally clause below:
success = false;
synchronized (this) {
// delete new non cfs files directly: they were never
// registered with IFD
deleteNewFiles(filesToRemove);
if (merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "abort merge after building CFS");
}
// Safe: these files must exist
deleteNewFiles(merge.info.files());
return 0;
}
}
merge.info.info.setUseCompoundFile(true);
} else {
// So that, if we hit exc in commitMerge (later),
// we close the per-segment readers in the finally
// clause below:
success = false;
}
merge.setMergeInfo(merge.info);
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
// above:
boolean success2 = false;
try {
codec.segmentInfoFormat().write(directory, merge.info.info, context);
success2 = true;
} finally {
if (!success2) {
// Safe: these files must exist
deleteNewFiles(merge.info.files());
}
}
// TODO: ideally we would freeze merge.info here!!
// because any changes after writing the .si will be
// lost...
if (infoStream.isEnabled("IW")) {
infoStream.message(
"IW",
String.format(
Locale.ROOT,
"merged segment size=%.3f MB vs estimate=%.3f MB",
merge.info.sizeInBytes() / 1024. / 1024.,
merge.estimatedMergeBytes / 1024. / 1024.));
}
final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
if (readerPool.isReaderPoolingEnabled() && mergedSegmentWarmer != null) {
final ReadersAndUpdates rld = getPooledInstance(merge.info, true);
final SegmentReader sr = rld.getReader(IOContext.DEFAULT);
try {
mergedSegmentWarmer.warm(sr);
} finally {
synchronized (this) {
rld.release(sr);
release(rld);
}
}
}
if (!commitMerge(merge, docMaps)) {
// commitMerge will return false if this merge was
// aborted
return 0;
}
success = true;
} finally {
// Readers are already closed in commitMerge if we didn't hit
// an exc:
if (success == false) {
closeMergeReaders(merge, true, false);
}
}
return merge.info.info.maxDoc();
}