in encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java [347:414]
private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) {
boolean success = false;
State collectionState = null;
long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
TimeOut timeOut = timeAllowedMs <= 0 ? null : new TimeOut(timeAllowedMs, TimeUnit.MILLISECONDS, getTimeSource());
try {
String collectionName = req.getCore().getCoreDescriptor().getCollectionName();
if (collectionName == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
}
log.debug("Encrypt request distributed for keyId={}", keyId);
DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
if (docCollection == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found.");
}
ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId);
Collection<Slice> slices = docCollection.getActiveSlices();
Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size());
for (Slice slice : slices) {
Replica replica = slice.getLeader();
if (replica == null) {
log.error("No leader found for shard {}", slice.getName());
collectionState = State.ERROR;
continue;
}
encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, req, params, keyId));
}
try {
List<Future<State>> responses = timeOut == null ?
executor.invokeAll(encryptRequests)
: executor.invokeAll(encryptRequests, timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
for (Future<State> response : responses) {
State state;
try {
state = response.get();
} catch (ExecutionException e) {
log.error("Error distributing encryption request for keyId={}", keyId, e);
collectionState = State.ERROR;
break;
} catch (CancellationException e) {
log.warn("Cancelled distributing encryption request for keyId={}", keyId, e);
if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) {
collectionState = State.TIMEOUT;
}
break;
}
if (collectionState == null || state.priority > collectionState.priority) {
collectionState = state;
}
}
} catch (InterruptedException e) {
collectionState = State.INTERRUPTED;
}
success = collectionState == null || collectionState.isSuccess();
} finally {
String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
rsp.add(STATUS, statusValue);
rsp.addToLog(STATUS, statusValue);
if (collectionState != null) {
rsp.add(ENCRYPTION_STATE, collectionState.value);
rsp.addToLog(ENCRYPTION_STATE, collectionState.value);
}
if (log.isInfoEnabled()) {
log.info("Responding encryption distributed state={} success={} for keyId={} timeMs={}",
(collectionState == null ? null : collectionState.value), success, keyId, toMs(elapsedTimeNs(startTimeNs)));
}
}
}