private void processComponents()

in solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java [466:698]


  private void processComponents(
      SolrQueryRequest req,
      SolrQueryResponse rsp,
      ResponseBuilder rb,
      RTimerTree timer,
      List<SearchComponent> components)
      throws IOException {
    // creates a ShardHandler object only if it's needed
    final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb);

    if (!prepareComponents(req, rb, timer, components)) return;

    { // Once all of our components have been prepared, check if this request involves a SortSpec.
      // If it does, and if our request includes a cursorMark param, then parse & init the
      // CursorMark state (This must happen after the prepare() of all components, because any
      // component may have modified the SortSpec)
      final SortSpec spec = rb.getSortSpec();
      final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM);
      if (null != spec && null != cursorStr) {
        final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec);
        cursorMark.parseSerializedTotem(cursorStr);
        rb.setCursorMark(cursorMark);
      }
    }

    if (!rb.isDistrib) {
      // a normal non-distributed request
      try {
        // The semantics of debugging vs not debugging are different enough that
        // it makes sense to have two control loops
        if (!rb.isDebug()) {
          // Process
          for (SearchComponent c : components) {
            if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) {
              shortCircuitedResults(req, rb);
              return;
            }
            c.process(rb);
          }
        } else {
          // Process
          RTimerTree subt = timer.sub("process");
          for (SearchComponent c : components) {
            if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp, components)) {
              shortCircuitedResults(req, rb);
              return;
            }
            rb.setTimer(subt.sub(c.getName()));
            c.process(rb);
            rb.getTimer().stop();
          }
          subt.stop();

          // add the timing info
          if (rb.isDebugTimings()) {
            rb.addDebugInfo("timing", timer.asNamedList());
          }
        }
      } catch (ExitableDirectoryReader.ExitingReaderException ex) {
        log.warn("Query: {}; ", req.getParamString(), ex);
        shortCircuitedResults(req, rb);
      }
    } else {
      // a distributed request

      if (rb.outgoing == null) {
        rb.outgoing = new ArrayList<>();
      }
      rb.finished = new ArrayList<>();

      int nextStage = 0;
      long totalShardCpuTime = 0L;
      do {
        rb.stage = nextStage;
        nextStage = ResponseBuilder.STAGE_DONE;

        // call all components
        for (SearchComponent c : components) {
          if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) {
            shortCircuitedResults(req, rb);
            return;
          } // the next stage is the minimum of what all components report
          nextStage = Math.min(nextStage, c.distributedProcess(rb));
        }

        // check the outgoing queue and send requests
        while (rb.outgoing.size() > 0) {

          // submit all current request tasks at once
          while (rb.outgoing.size() > 0) {
            ShardRequest sreq = rb.outgoing.remove(0);
            sreq.actualShards = sreq.shards;
            if (sreq.actualShards == ShardRequest.ALL_SHARDS) {
              sreq.actualShards = rb.shards;
            }
            // presume we'll get a response from each shard we send to
            sreq.responses = new ArrayList<>(sreq.actualShards.length);

            // TODO: map from shard to address[]
            for (String shard : sreq.actualShards) {
              ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
              ShardHandler.setShardAttributesToParams(params, sreq.purpose);

              // Distributed request -- need to send queryID as a part of the distributed request
              params.setNonNull(ShardParams.QUERY_ID, rb.queryID);
              if (rb.requestInfo != null) {
                // we could try and detect when this is needed, but it could be tricky
                params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
              }
              String shardQt = params.get(ShardParams.SHARDS_QT);
              if (shardQt != null) {
                params.set(CommonParams.QT, shardQt);
              } else {
                // for distributed queries that don't include shards.qt, use the original path
                // as the default but operators need to update their luceneMatchVersion to enable
                // this behavior since it did not work this way prior to 5.1
                String reqPath = (String) req.getContext().get(PATH);
                if (!"/select".equals(reqPath)) {
                  params.set(CommonParams.QT, reqPath);
                } // else if path is /select, then the qt gets passed thru if set
              }
              shardHandler1.submit(sreq, shard, params);
            }
          }

          // now wait for replies, but if anyone puts more requests on
          // the outgoing queue, send them out immediately (by exiting
          // this loop)
          boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req);
          while (rb.outgoing.size() == 0) {
            ShardResponse srsp =
                tolerant
                    ? shardHandler1.takeCompletedIncludingErrors()
                    : shardHandler1.takeCompletedOrError();
            if (srsp == null) break; // no more requests to wait for
            AtomicReference<Object> detailMesg =
                new AtomicReference<>(); // or perhaps new Object[1] ?

            boolean anyResponsesPartial =
                srsp.getShardRequest().responses.stream()
                    .anyMatch(
                        response -> {
                          NamedList<Object> resp = response.getSolrResponse().getResponse();
                          if (resp == null) {
                            return false;
                          }
                          Object recursive = resp.findRecursive("responseHeader", "partialResults");
                          if (recursive != null) {
                            Object message =
                                "[Shard:"
                                    + response.getShardAddress()
                                    + "]"
                                    + resp.findRecursive(
                                        "responseHeader",
                                        RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY);
                            detailMesg.compareAndSet(null, message); // first one, ingore rest
                          }
                          return recursive != null;
                        });
            if (anyResponsesPartial) {
              rb.rsp.addPartialResponseDetail(detailMesg.get());
              rsp.setPartialResults(rb.req);
            }
            // Was there an exception?
            if (srsp.getException() != null) {
              // If things are not tolerant, abort everything and rethrow
              if (!tolerant) {
                shardHandler1.cancelAll();
                throwSolrException(srsp.getException());
              } else {
                // Check if the purpose includes 'PURPOSE_GET_TOP_IDS'
                boolean includesTopIdsPurpose =
                    (srsp.getShardRequest().purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0;
                // Check if all responses have exceptions
                boolean allResponsesHaveExceptions =
                    srsp.getShardRequest().responses.stream()
                        .allMatch(response -> response.getException() != null);
                // Check if all shards have failed for PURPOSE_GET_TOP_IDS
                boolean allShardsFailed = includesTopIdsPurpose && allResponsesHaveExceptions;
                // if all shards fail, fail the request despite shards.tolerant
                if (allShardsFailed) {
                  throwSolrException(srsp.getException());
                } else {
                  rsp.setPartialResults(rb.req);
                  if (publishCpuTime) {
                    totalShardCpuTime += computeShardCpuTime(srsp.getShardRequest().responses);
                    rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
                    rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
                  }
                }
              }
            }

            rb.finished.add(srsp.getShardRequest());

            // let the components see the responses to the request
            for (SearchComponent c : components) {
              if (checkLimitsBefore(
                  c,
                  "handleResponses next stage:" + stageToString(nextStage),
                  rb.req,
                  rb.rsp,
                  components)) {
                shortCircuitedResults(req, rb);
                return;
              }
              c.handleResponses(rb, srsp.getShardRequest());
            }

            // Compute total CpuTime used by all shards.
            if (publishCpuTime) {
              totalShardCpuTime += computeShardCpuTime(srsp.getShardRequest().responses);
            }
          }
        }

        for (SearchComponent c : components) {
          if (checkLimitsBefore(
              c, "finishStage stage:" + stageToString(nextStage), rb.req, rb.rsp, components)) {
            return;
          }
          c.finishStage(rb);
        }

        // we are done when the next stage is MAX_VALUE
      } while (nextStage != Integer.MAX_VALUE);

      if (publishCpuTime) {
        rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
        rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
      }
    }
  }