private void distributeRequest()

in encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java [347:414]


  private void distributeRequest(SolrQueryRequest req, SolrQueryResponse rsp, String keyId, long startTimeNs) {
    boolean success = false;
    State collectionState = null;
    long timeAllowedMs = req.getParams().getLong(TIME_ALLOWED, 0);
    TimeOut timeOut = timeAllowedMs <= 0 ? null : new TimeOut(timeAllowedMs, TimeUnit.MILLISECONDS, getTimeSource());
    try {
      String collectionName = req.getCore().getCoreDescriptor().getCollectionName();
      if (collectionName == null) {
        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " can only be used in Solr Cloud mode.");
      }
      log.debug("Encrypt request distributed for keyId={}", keyId);
      DocCollection docCollection = req.getCore().getCoreContainer().getZkController().getZkStateReader().getCollection(collectionName);
      if (docCollection == null) {
        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Parameter " + DISTRIB + " present but collection '" + collectionName + "' not found.");
      }
      ModifiableSolrParams params = createDistributedRequestParams(req, rsp, keyId);
      Collection<Slice> slices = docCollection.getActiveSlices();
      Collection<Callable<State>> encryptRequests = new ArrayList<>(slices.size());
      for (Slice slice : slices) {
        Replica replica = slice.getLeader();
        if (replica == null) {
          log.error("No leader found for shard {}", slice.getName());
          collectionState = State.ERROR;
          continue;
        }
        encryptRequests.add(() -> sendEncryptionRequestWithRetry(replica, req, params, keyId));
      }
      try {
        List<Future<State>> responses = timeOut == null ?
            executor.invokeAll(encryptRequests)
            : executor.invokeAll(encryptRequests, timeOut.timeLeft(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        for (Future<State> response : responses) {
          State state;
          try {
            state = response.get();
          } catch (ExecutionException e) {
            log.error("Error distributing encryption request for keyId={}", keyId, e);
            collectionState = State.ERROR;
            break;
          } catch (CancellationException e) {
            log.warn("Cancelled distributing encryption request for keyId={}", keyId, e);
            if (collectionState == null || State.TIMEOUT.priority > collectionState.priority) {
              collectionState = State.TIMEOUT;
            }
            break;
          }
          if (collectionState == null || state.priority > collectionState.priority) {
            collectionState = state;
          }
        }
      } catch (InterruptedException e) {
        collectionState = State.INTERRUPTED;
      }
      success = collectionState == null || collectionState.isSuccess();
    } finally {
      String statusValue = success ? STATUS_SUCCESS : STATUS_FAILURE;
      rsp.add(STATUS, statusValue);
      rsp.addToLog(STATUS, statusValue);
      if (collectionState != null) {
        rsp.add(ENCRYPTION_STATE, collectionState.value);
        rsp.addToLog(ENCRYPTION_STATE, collectionState.value);
      }
      if (log.isInfoEnabled()) {
        log.info("Responding encryption distributed state={} success={} for keyId={} timeMs={}",
            (collectionState == null ? null : collectionState.value), success, keyId, toMs(elapsedTimeNs(startTimeNs)));
      }
    }
  }