public void doReplay()

in solr/core/src/java/org/apache/solr/update/UpdateLog.java [2082:2308]


    public void doReplay(TransactionLog translog) {
      try {
        loglog.warn(
            "Starting log replay {}  active={} starting pos={} inSortedOrder={}",
            translog,
            activeLog,
            recoveryInfo.positionOfStart,
            inSortedOrder);
        long lastStatusTime = System.nanoTime();
        try {
          if (inSortedOrder) {
            tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
          } else {
            tlogReader = translog.getReader(recoveryInfo.positionOfStart);
          }
        } catch (IOException e) {
          throw new UncheckedIOException(e);
        }

        // NOTE: we don't currently handle a core reload during recovery.  This would cause the core
        // to change underneath us.

        // Use a pool of URPs using a ThreadLocal to have them per-thread.  URPs aren't threadsafe.
        UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
        Collection<UpdateRequestProcessor> procPool =
            Collections.synchronizedList(new ArrayList<>());
        ThreadLocal<UpdateRequestProcessor> procThreadLocal =
            ThreadLocal.withInitial(
                () -> {
                  // SolrQueryRequest is not thread-safe, so use a copy when creating URPs
                  final var localRequest =
                      new LocalSolrQueryRequest(uhandler.core, BASE_REPLAY_PARAMS);
                  var proc = processorChain.createProcessor(localRequest, rsp);
                  procPool.add(proc);
                  return proc;
                });

        OrderedExecutor<BytesRef> executor =
            inSortedOrder ? null : req.getCoreContainer().getReplayUpdatesExecutor();
        AtomicInteger pendingTasks = new AtomicInteger(0);
        AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>();

        long commitVersion = 0;
        int operationAndFlags = 0;
        long nextCount = 0;

        for (; ; ) {
          Object o = null;
          if (cancelApplyBufferUpdate) break;
          try {
            if (testing_logReplayHook != null) testing_logReplayHook.run();
            if (nextCount++ % 1000 == 0) {
              long now = System.nanoTime();
              if (now - lastStatusTime > STATUS_TIME) {
                lastStatusTime = now;
                long cpos = tlogReader.currentPos();
                long csize = tlogReader.currentSize();
                if (log.isInfoEnabled()) {
                  loglog.info(
                      "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
                      translog,
                      activeLog,
                      recoveryInfo.positionOfStart,
                      cpos,
                      csize,
                      Math.floor(cpos / (double) csize * 100.));
                }
              }
            }

            o = null;
            o = tlogReader.next();
            if (o == null && activeLog) {
              if (!finishing) {
                // about to block all the updates including the tasks in the executor
                // therefore we must wait for them to be finished
                waitForAllUpdatesGetExecuted(pendingTasks);
                // from this point, remain updates will be executed in a single thread
                executor = null;
                // block to prevent new adds, but don't immediately unlock since
                // we could be starved from ever completing recovery.  Only unlock
                // after we've finished this recovery.
                // NOTE: our own updates won't be blocked since the thread holding a write lock can
                // lock a read lock.
                updateLocks.blockUpdates();
                finishing = true;
                o = tlogReader.next();
              } else {
                // we had previously blocked updates, so this "null" from the log is final.

                // Wait until our final commit to change the state and unlock.
                // This is only so no new updates are written to the current log file, and is
                // only an issue if we crash before the commit (and we are paying attention
                // to incomplete log files).
                //
                // versionInfo.unblockUpdates();
              }
            }
          } catch (Exception e) {
            log.error("Exception during replay", e);
          }

          if (o == null) break;
          // fail fast
          if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();

          try {

            // should currently be a List<Oper,Ver,Doc/Id>
            List<?> entry = (List<?>) o;
            operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
            int oper = operationAndFlags & OPERATION_MASK;
            long version = (Long) entry.get(UpdateLog.VERSION_IDX);

            switch (oper) {
              case UpdateLog.UPDATE_INPLACE: // fall through to ADD
              case UpdateLog.ADD:
                {
                  recoveryInfo.adds++;
                  AddUpdateCommand cmd =
                      convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
                  cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                  if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
                  execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
                  break;
                }
              case UpdateLog.DELETE:
                {
                  recoveryInfo.deletes++;
                  byte[] idBytes = (byte[]) entry.get(2);
                  DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                  cmd.setIndexedId(new BytesRef(idBytes));
                  cmd.setVersion(version);
                  cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                  if (debug) log.debug("delete {}", cmd);
                  execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
                  break;
                }

              case UpdateLog.DELETE_BY_QUERY:
                {
                  recoveryInfo.deleteByQuery++;
                  String query = (String) entry.get(2);
                  DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                  cmd.query = query;
                  cmd.setVersion(version);
                  cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                  if (debug) log.debug("deleteByQuery {}", cmd);
                  waitForAllUpdatesGetExecuted(pendingTasks);
                  // DBQ will be executed in the same thread
                  execute(cmd, null, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate);
                  break;
                }
              case UpdateLog.COMMIT:
                {
                  commitVersion = version;
                  break;
                }

              default:
                throw new SolrException(
                    SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
            }

            if (rsp.getException() != null) {
              loglog.error("REPLAY_ERR: Exception replaying log {}", rsp.getException());
              throw rsp.getException();
            }
            if (state == State.REPLAYING) {
              replayOpsMeter.mark();
            } else if (state == State.APPLYING_BUFFERED) {
              applyingBufferedOpsMeter.mark();
            } else {
              // XXX should not happen?
            }
          } catch (ClassCastException cl) {
            recoveryInfo.errors.incrementAndGet();
            loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log.  Entry={}", o, cl);
            // would be caused by a corrupt transaction log
          } catch (Exception ex) {
            recoveryInfo.errors.incrementAndGet();
            loglog.warn("REPLAY_ERR: Exception replaying log", ex);
            // something wrong with the request?
          }
          assert TestInjection.injectUpdateLogReplayRandomPause();
        }

        waitForAllUpdatesGetExecuted(pendingTasks);
        if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();

        CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
        cmd.setVersion(commitVersion);
        cmd.softCommit = false;
        cmd.waitSearcher = true;
        cmd.setFlags(UpdateCommand.REPLAY);
        try {
          if (debug) log.debug("commit {}", cmd);
          // this should cause a commit to be added to the incomplete log and avoid it being
          // replayed again after a restart.
          uhandler.commit(cmd);
        } catch (IOException ex) {
          recoveryInfo.errors.incrementAndGet();
          loglog.error("Replay exception: final commit.", ex);
        }

        if (!activeLog) {
          // if we are replaying an old tlog file, we need to add a commit to the end
          // so we don't replay it again if we restart right after.
          translog.writeCommit(cmd);
        }

        for (UpdateRequestProcessor proc : procPool) {
          try {
            proc.finish();
          } catch (IOException ex) {
            recoveryInfo.errors.incrementAndGet();
            loglog.error("Replay exception: finish()", ex);
          } finally {
            IOUtils.closeQuietly(proc);
          }
        }

      } finally {
        if (tlogReader != null) tlogReader.close();
        translog.decref();
      }
    }