in solr/core/src/java/org/apache/solr/update/UpdateLog.java [2082:2308]
public void doReplay(TransactionLog translog) {
try {
loglog.warn(
"Starting log replay {} active={} starting pos={} inSortedOrder={}",
translog,
activeLog,
recoveryInfo.positionOfStart,
inSortedOrder);
long lastStatusTime = System.nanoTime();
try {
if (inSortedOrder) {
tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
} else {
tlogReader = translog.getReader(recoveryInfo.positionOfStart);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
// NOTE: we don't currently handle a core reload during recovery. This would cause the core
// to change underneath us.
// Use a pool of URPs using a ThreadLocal to have them per-thread. URPs aren't threadsafe.
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
Collection<UpdateRequestProcessor> procPool =
Collections.synchronizedList(new ArrayList<>());
ThreadLocal<UpdateRequestProcessor> procThreadLocal =
ThreadLocal.withInitial(
() -> {
// SolrQueryRequest is not thread-safe, so use a copy when creating URPs
final var localRequest =
new LocalSolrQueryRequest(uhandler.core, BASE_REPLAY_PARAMS);
var proc = processorChain.createProcessor(localRequest, rsp);
procPool.add(proc);
return proc;
});
OrderedExecutor<BytesRef> executor =
inSortedOrder ? null : req.getCoreContainer().getReplayUpdatesExecutor();
AtomicInteger pendingTasks = new AtomicInteger(0);
AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>();
long commitVersion = 0;
int operationAndFlags = 0;
long nextCount = 0;
for (; ; ) {
Object o = null;
if (cancelApplyBufferUpdate) break;
try {
if (testing_logReplayHook != null) testing_logReplayHook.run();
if (nextCount++ % 1000 == 0) {
long now = System.nanoTime();
if (now - lastStatusTime > STATUS_TIME) {
lastStatusTime = now;
long cpos = tlogReader.currentPos();
long csize = tlogReader.currentSize();
if (log.isInfoEnabled()) {
loglog.info(
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
translog,
activeLog,
recoveryInfo.positionOfStart,
cpos,
csize,
Math.floor(cpos / (double) csize * 100.));
}
}
}
o = null;
o = tlogReader.next();
if (o == null && activeLog) {
if (!finishing) {
// about to block all the updates including the tasks in the executor
// therefore we must wait for them to be finished
waitForAllUpdatesGetExecuted(pendingTasks);
// from this point, remain updates will be executed in a single thread
executor = null;
// block to prevent new adds, but don't immediately unlock since
// we could be starved from ever completing recovery. Only unlock
// after we've finished this recovery.
// NOTE: our own updates won't be blocked since the thread holding a write lock can
// lock a read lock.
updateLocks.blockUpdates();
finishing = true;
o = tlogReader.next();
} else {
// we had previously blocked updates, so this "null" from the log is final.
// Wait until our final commit to change the state and unlock.
// This is only so no new updates are written to the current log file, and is
// only an issue if we crash before the commit (and we are paying attention
// to incomplete log files).
//
// versionInfo.unblockUpdates();
}
}
} catch (Exception e) {
log.error("Exception during replay", e);
}
if (o == null) break;
// fail fast
if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
try {
// should currently be a List<Oper,Ver,Doc/Id>
List<?> entry = (List<?>) o;
operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
switch (oper) {
case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD:
{
recoveryInfo.adds++;
AddUpdateCommand cmd =
convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.DELETE:
{
recoveryInfo.deletes++;
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("delete {}", cmd);
execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.DELETE_BY_QUERY:
{
recoveryInfo.deleteByQuery++;
String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("deleteByQuery {}", cmd);
waitForAllUpdatesGetExecuted(pendingTasks);
// DBQ will be executed in the same thread
execute(cmd, null, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.COMMIT:
{
commitVersion = version;
break;
}
default:
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
if (rsp.getException() != null) {
loglog.error("REPLAY_ERR: Exception replaying log {}", rsp.getException());
throw rsp.getException();
}
if (state == State.REPLAYING) {
replayOpsMeter.mark();
} else if (state == State.APPLYING_BUFFERED) {
applyingBufferedOpsMeter.mark();
} else {
// XXX should not happen?
}
} catch (ClassCastException cl) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry={}", o, cl);
// would be caused by a corrupt transaction log
} catch (Exception ex) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: Exception replaying log", ex);
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
}
waitForAllUpdatesGetExecuted(pendingTasks);
if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
cmd.setVersion(commitVersion);
cmd.softCommit = false;
cmd.waitSearcher = true;
cmd.setFlags(UpdateCommand.REPLAY);
try {
if (debug) log.debug("commit {}", cmd);
// this should cause a commit to be added to the incomplete log and avoid it being
// replayed again after a restart.
uhandler.commit(cmd);
} catch (IOException ex) {
recoveryInfo.errors.incrementAndGet();
loglog.error("Replay exception: final commit.", ex);
}
if (!activeLog) {
// if we are replaying an old tlog file, we need to add a commit to the end
// so we don't replay it again if we restart right after.
translog.writeCommit(cmd);
}
for (UpdateRequestProcessor proc : procPool) {
try {
proc.finish();
} catch (IOException ex) {
recoveryInfo.errors.incrementAndGet();
loglog.error("Replay exception: finish()", ex);
} finally {
IOUtils.closeQuietly(proc);
}
}
} finally {
if (tlogReader != null) tlogReader.close();
translog.decref();
}
}