in solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java [228:511]
public void doSplit() throws IOException {
List<LeafReaderContext> leaves = searcher.getRawReader().leaves();
Directory parentDirectory = searcher.getRawReader().directory();
List<FixedBitSet[]> segmentDocSets = new ArrayList<>(leaves.size());
SolrIndexConfig parentConfig = searcher.getCore().getSolrConfig().indexConfig;
String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
if (log.isInfoEnabled()) {
log.info("SolrIndexSplitter: partitions={} segments={}", numPieces, leaves.size());
}
RTimerTree t;
// this tracks round-robin assignment of docs to partitions
AtomicInteger currentPartition = new AtomicInteger();
if (splitMethod != SplitMethod.LINK) {
t = timings.sub("findDocSetsPerLeaf");
for (LeafReaderContext readerContext : leaves) {
assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
FixedBitSet[] docSets =
split(
readerContext,
numPieces,
field,
rangesArr,
splitKey,
hashRouter,
currentPartition,
false);
segmentDocSets.add(docSets);
}
t.stop();
}
Map<IndexReader.CacheKey, FixedBitSet[]> docsToDeleteCache = new ConcurrentHashMap<>();
// would it be more efficient to write segment-at-a-time to each new index?
// - need to worry about number of open descriptors
// - need to worry about if IW.addIndexes does a sync or not...
// - would be more efficient on the read side, but prob less efficient merging
for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
String partitionName =
"SolrIndexSplitter:partition="
+ partitionNumber
+ ",partitionCount="
+ numPieces
+ (cmd.ranges != null ? ",range=" + cmd.ranges.get(partitionNumber) : "");
log.info(partitionName);
boolean success = false;
RefCounted<IndexWriter> iwRef = null;
IndexWriter iw;
if (cmd.cores != null && splitMethod != SplitMethod.LINK) {
SolrCore subCore = cmd.cores.get(partitionNumber);
iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
iw = iwRef.get();
} else {
if (splitMethod == SplitMethod.LINK) {
SolrCore subCore = cmd.cores.get(partitionNumber);
String path = subCore.getDataDir() + INDEX_PREFIX + timestamp;
t = timings.sub("hardLinkCopy");
t.resume();
// copy by hard-linking
Directory splitDir =
subCore
.getDirectoryFactory()
.get(
path,
DirectoryFactory.DirContext.DEFAULT,
subCore.getSolrConfig().indexConfig.lockType);
// the wrapper doesn't hold any resources itself so it doesn't need closing
HardlinkCopyDirectoryWrapper hardLinkedDir = new HardlinkCopyDirectoryWrapper(splitDir);
boolean copiedOk = false;
try {
for (String file : parentDirectory.listAll()) {
// we've closed the IndexWriter, so ignore write.lock
// its file may be present even when IndexWriter is closed but
// we've already checked that the lock is not held by anyone else
if (file.equals(IndexWriter.WRITE_LOCK_NAME)) {
continue;
}
hardLinkedDir.copyFrom(parentDirectory, file, file, IOContext.DEFAULT);
}
copiedOk = true;
} finally {
if (!copiedOk) {
subCore.getDirectoryFactory().doneWithDirectory(splitDir);
subCore.getDirectoryFactory().remove(splitDir);
}
}
t.pause();
IndexWriterConfig iwConfig = parentConfig.toIndexWriterConfig(subCore);
// don't run merges at this time
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
t = timings.sub("createSubIW");
t.resume();
iw = new SolrIndexWriter(partitionName, splitDir, iwConfig);
t.pause();
} else {
SolrCore core = searcher.getCore();
String path = cmd.paths.get(partitionNumber);
t = timings.sub("createSubIW");
t.resume();
iw =
SolrIndexWriter.create(
core,
partitionName,
path,
core.getDirectoryFactory(),
true,
core.getLatestSchema(),
core.getSolrConfig().indexConfig,
core.getDeletionPolicy(),
core.getCodec());
t.pause();
}
}
try {
if (splitMethod == SplitMethod.LINK) {
t = timings.sub("deleteDocuments");
t.resume();
// apply deletions specific to this partition. As a side-effect on the first call this
// also populates a cache of docsets to delete per leaf reader per partition, which is
// reused for subsequent partitions.
iw.deleteDocuments(
new SplittingQuery(
partitionNumber,
field,
rangesArr,
hashRouter,
splitKey,
docsToDeleteCache,
currentPartition));
t.pause();
} else {
// This removes deletions but optimize might still be needed because sub-shards will have
// the same number of segments as the parent shard.
t = timings.sub("addIndexes");
t.resume();
for (int segmentNumber = 0; segmentNumber < leaves.size(); segmentNumber++) {
if (log.isInfoEnabled()) {
log.info(
"SolrIndexSplitter: partition # {} partitionCount={} {} segment #={} segmentCount={}",
partitionNumber,
numPieces,
(cmd.ranges != null ? " range=" + cmd.ranges.get(partitionNumber) : ""),
segmentNumber,
leaves.size()); // nowarn
}
CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
iw.addIndexes(
new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
}
t.pause();
}
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
// TODO no commitUpdateCommand
SolrIndexWriter.setCommitData(iw, -1, cmd.commitData);
t = timings.sub("subIWCommit");
t.resume();
iw.commit();
t.pause();
success = true;
} finally {
if (iwRef != null) {
iwRef.decref();
} else {
if (success) {
t = timings.sub("subIWClose");
t.resume();
iw.close();
t.pause();
} else {
IOUtils.closeWhileHandlingException(iw);
}
if (splitMethod == SplitMethod.LINK) {
SolrCore subCore = cmd.cores.get(partitionNumber);
subCore.getDirectoryFactory().release(iw.getDirectory());
}
}
}
}
// all sub-indexes created ok
// when using hard-linking switch directories & refresh cores
if (splitMethod == SplitMethod.LINK && cmd.cores != null) {
boolean switchOk = true;
t = timings.sub("switchSubIndexes");
for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
SolrCore subCore = cmd.cores.get(partitionNumber);
String indexDirPath = subCore.getIndexDir();
log.debug("Switching directories");
String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
subCore.modifyIndexProps(INDEX_PREFIX + timestamp);
try {
subCore.getUpdateHandler().newIndexWriter(false);
openNewSearcher(subCore);
} catch (Exception e) {
log.error(
"Failed to switch sub-core {} to {}, split will fail", indexDirPath, hardLinkPath, e);
switchOk = false;
break;
}
}
t.stop();
if (!switchOk) {
t = timings.sub("rollbackSubIndexes");
// rollback the switch
for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
SolrCore subCore = cmd.cores.get(partitionNumber);
Directory dir = null;
try {
dir =
subCore
.getDirectoryFactory()
.get(
subCore.getDataDir(),
DirectoryFactory.DirContext.META_DATA,
subCore.getSolrConfig().indexConfig.lockType);
dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
} finally {
if (dir != null) {
subCore.getDirectoryFactory().release(dir);
}
}
// switch back if necessary and remove the hardlinked dir
String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
try {
dir =
subCore
.getDirectoryFactory()
.get(
hardLinkPath,
DirectoryFactory.DirContext.DEFAULT,
subCore.getSolrConfig().indexConfig.lockType);
subCore.getDirectoryFactory().doneWithDirectory(dir);
subCore.getDirectoryFactory().remove(dir);
} finally {
if (dir != null) {
subCore.getDirectoryFactory().release(dir);
}
}
subCore.getUpdateHandler().newIndexWriter(false);
try {
openNewSearcher(subCore);
} catch (Exception e) {
log.warn("Error rolling back failed split of {}", hardLinkPath, e);
}
}
t.stop();
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "There were errors during index split");
} else {
// complete the switch - remove original index
t = timings.sub("cleanSubIndex");
for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
SolrCore subCore = cmd.cores.get(partitionNumber);
String oldIndexPath = subCore.getDataDir() + "index";
Directory indexDir = null;
try {
indexDir =
subCore
.getDirectoryFactory()
.get(
oldIndexPath,
DirectoryFactory.DirContext.DEFAULT,
subCore.getSolrConfig().indexConfig.lockType);
subCore.getDirectoryFactory().doneWithDirectory(indexDir);
subCore.getDirectoryFactory().remove(indexDir);
} finally {
if (indexDir != null) {
subCore.getDirectoryFactory().release(indexDir);
}
}
}
t.stop();
}
}
}