in solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java [466:698]
private void processComponents(
SolrQueryRequest req,
SolrQueryResponse rsp,
ResponseBuilder rb,
RTimerTree timer,
List<SearchComponent> components)
throws IOException {
// creates a ShardHandler object only if it's needed
final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb);
if (!prepareComponents(req, rb, timer, components)) return;
{ // Once all of our components have been prepared, check if this request involves a SortSpec.
// If it does, and if our request includes a cursorMark param, then parse & init the
// CursorMark state (This must happen after the prepare() of all components, because any
// component may have modified the SortSpec)
final SortSpec spec = rb.getSortSpec();
final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM);
if (null != spec && null != cursorStr) {
final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), spec);
cursorMark.parseSerializedTotem(cursorStr);
rb.setCursorMark(cursorMark);
}
}
if (!rb.isDistrib) {
// a normal non-distributed request
try {
// The semantics of debugging vs not debugging are different enough that
// it makes sense to have two control loops
if (!rb.isDebug()) {
// Process
for (SearchComponent c : components) {
if (checkLimitsBefore(c, "process", rb.req, rb.rsp, components)) {
shortCircuitedResults(req, rb);
return;
}
c.process(rb);
}
} else {
// Process
RTimerTree subt = timer.sub("process");
for (SearchComponent c : components) {
if (checkLimitsBefore(c, "process debug", rb.req, rb.rsp, components)) {
shortCircuitedResults(req, rb);
return;
}
rb.setTimer(subt.sub(c.getName()));
c.process(rb);
rb.getTimer().stop();
}
subt.stop();
// add the timing info
if (rb.isDebugTimings()) {
rb.addDebugInfo("timing", timer.asNamedList());
}
}
} catch (ExitableDirectoryReader.ExitingReaderException ex) {
log.warn("Query: {}; ", req.getParamString(), ex);
shortCircuitedResults(req, rb);
}
} else {
// a distributed request
if (rb.outgoing == null) {
rb.outgoing = new ArrayList<>();
}
rb.finished = new ArrayList<>();
int nextStage = 0;
long totalShardCpuTime = 0L;
do {
rb.stage = nextStage;
nextStage = ResponseBuilder.STAGE_DONE;
// call all components
for (SearchComponent c : components) {
if (checkLimitsBefore(c, "distrib", rb.req, rb.rsp, components)) {
shortCircuitedResults(req, rb);
return;
} // the next stage is the minimum of what all components report
nextStage = Math.min(nextStage, c.distributedProcess(rb));
}
// check the outgoing queue and send requests
while (rb.outgoing.size() > 0) {
// submit all current request tasks at once
while (rb.outgoing.size() > 0) {
ShardRequest sreq = rb.outgoing.remove(0);
sreq.actualShards = sreq.shards;
if (sreq.actualShards == ShardRequest.ALL_SHARDS) {
sreq.actualShards = rb.shards;
}
// presume we'll get a response from each shard we send to
sreq.responses = new ArrayList<>(sreq.actualShards.length);
// TODO: map from shard to address[]
for (String shard : sreq.actualShards) {
ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
ShardHandler.setShardAttributesToParams(params, sreq.purpose);
// Distributed request -- need to send queryID as a part of the distributed request
params.setNonNull(ShardParams.QUERY_ID, rb.queryID);
if (rb.requestInfo != null) {
// we could try and detect when this is needed, but it could be tricky
params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
}
String shardQt = params.get(ShardParams.SHARDS_QT);
if (shardQt != null) {
params.set(CommonParams.QT, shardQt);
} else {
// for distributed queries that don't include shards.qt, use the original path
// as the default but operators need to update their luceneMatchVersion to enable
// this behavior since it did not work this way prior to 5.1
String reqPath = (String) req.getContext().get(PATH);
if (!"/select".equals(reqPath)) {
params.set(CommonParams.QT, reqPath);
} // else if path is /select, then the qt gets passed thru if set
}
shardHandler1.submit(sreq, shard, params);
}
}
// now wait for replies, but if anyone puts more requests on
// the outgoing queue, send them out immediately (by exiting
// this loop)
boolean tolerant = HttpShardHandler.getShardsTolerantAsBool(rb.req);
while (rb.outgoing.size() == 0) {
ShardResponse srsp =
tolerant
? shardHandler1.takeCompletedIncludingErrors()
: shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for
AtomicReference<Object> detailMesg =
new AtomicReference<>(); // or perhaps new Object[1] ?
boolean anyResponsesPartial =
srsp.getShardRequest().responses.stream()
.anyMatch(
response -> {
NamedList<Object> resp = response.getSolrResponse().getResponse();
if (resp == null) {
return false;
}
Object recursive = resp.findRecursive("responseHeader", "partialResults");
if (recursive != null) {
Object message =
"[Shard:"
+ response.getShardAddress()
+ "]"
+ resp.findRecursive(
"responseHeader",
RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY);
detailMesg.compareAndSet(null, message); // first one, ingore rest
}
return recursive != null;
});
if (anyResponsesPartial) {
rb.rsp.addPartialResponseDetail(detailMesg.get());
rsp.setPartialResults(rb.req);
}
// Was there an exception?
if (srsp.getException() != null) {
// If things are not tolerant, abort everything and rethrow
if (!tolerant) {
shardHandler1.cancelAll();
throwSolrException(srsp.getException());
} else {
// Check if the purpose includes 'PURPOSE_GET_TOP_IDS'
boolean includesTopIdsPurpose =
(srsp.getShardRequest().purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0;
// Check if all responses have exceptions
boolean allResponsesHaveExceptions =
srsp.getShardRequest().responses.stream()
.allMatch(response -> response.getException() != null);
// Check if all shards have failed for PURPOSE_GET_TOP_IDS
boolean allShardsFailed = includesTopIdsPurpose && allResponsesHaveExceptions;
// if all shards fail, fail the request despite shards.tolerant
if (allShardsFailed) {
throwSolrException(srsp.getException());
} else {
rsp.setPartialResults(rb.req);
if (publishCpuTime) {
totalShardCpuTime += computeShardCpuTime(srsp.getShardRequest().responses);
rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
}
}
}
}
rb.finished.add(srsp.getShardRequest());
// let the components see the responses to the request
for (SearchComponent c : components) {
if (checkLimitsBefore(
c,
"handleResponses next stage:" + stageToString(nextStage),
rb.req,
rb.rsp,
components)) {
shortCircuitedResults(req, rb);
return;
}
c.handleResponses(rb, srsp.getShardRequest());
}
// Compute total CpuTime used by all shards.
if (publishCpuTime) {
totalShardCpuTime += computeShardCpuTime(srsp.getShardRequest().responses);
}
}
}
for (SearchComponent c : components) {
if (checkLimitsBefore(
c, "finishStage stage:" + stageToString(nextStage), rb.req, rb.rsp, components)) {
return;
}
c.finishStage(rb);
}
// we are done when the next stage is MAX_VALUE
} while (nextStage != Integer.MAX_VALUE);
if (publishCpuTime) {
rsp.getResponseHeader().add(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
rsp.addToLog(ThreadCpuTimer.CPU_TIME, totalShardCpuTime);
}
}
}