void sendUpdateStream()

in solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java [222:420]


    void sendUpdateStream() throws Exception {

      while (!queue.isEmpty()) {
        HttpPost method = null;
        HttpResponse response = null;

        InputStream rspBody = null;
        try {
          Update update;
          notifyQueueAndRunnersIfEmptyQueue();
          try {
            inPoll = true;
            update = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS);
          } catch (InterruptedException e) {
            if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
            continue;
          } finally {
            inPoll = false;
          }
          if (update == null) break;

          String contentType = client.requestWriter.getUpdateContentType();
          final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);

          final ModifiableSolrParams origParams =
              new ModifiableSolrParams(update.getRequest().getParams());
          final String origTargetCollection = update.getCollection();

          EntityTemplate template =
              new EntityTemplate(
                  new ContentProducer() {

                    @Override
                    public void writeTo(OutputStream out) throws IOException {

                      if (isXml) {
                        out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
                      }
                      Update upd = update;
                      while (upd != null) {
                        UpdateRequest req = upd.getRequest();
                        if (!origParams.equals(req.getParams())
                            || !Objects.equals(origTargetCollection, upd.getCollection())) {
                          // Request has different params or destination core/collection, return to
                          // queue
                          queue.add(upd);
                          break;
                        }

                        client.requestWriter.write(req, out);
                        if (isXml) {
                          // check for commit or optimize
                          SolrParams params = req.getParams();
                          if (params != null) {
                            String fmt = null;
                            if (params.getBool(UpdateParams.OPTIMIZE, false)) {
                              fmt = "<optimize waitSearcher=\"%s\" />";
                            } else if (params.getBool(UpdateParams.COMMIT, false)) {
                              fmt = "<commit waitSearcher=\"%s\" />";
                            }
                            if (fmt != null) {
                              byte[] content =
                                  String.format(
                                          Locale.ROOT,
                                          fmt,
                                          params.getBool(UpdateParams.WAIT_SEARCHER, false) + "")
                                      .getBytes(StandardCharsets.UTF_8);
                              out.write(content);
                            }
                          }
                        }
                        out.flush();

                        notifyQueueAndRunnersIfEmptyQueue();
                        inPoll = true;
                        try {
                          while (true) {
                            try {
                              upd = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS);
                              break;
                            } catch (InterruptedException e) {
                              if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
                              if (!queue.isEmpty()) {
                                continue;
                              }
                              if (log.isDebugEnabled()) pollExits.incrementAndGet();
                              upd = null;
                              break;
                            } finally {
                              inPoll = false;
                            }
                          }
                        } finally {
                          inPoll = false;
                        }
                      }

                      if (isXml) {
                        out.write("</stream>".getBytes(StandardCharsets.UTF_8));
                      }
                    }
                  });

          // The parser 'wt=' and 'version=' params are used instead of the
          // original params
          ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
          requestParams.set(CommonParams.WT, client.parser.getWriterType());
          requestParams.set(CommonParams.VERSION, client.parser.getVersion());

          String basePath = client.getBaseURL();
          if (update.getCollection() != null) {
            basePath += "/" + update.getCollection();
          } else if (client.getDefaultCollection() != null) {
            basePath += "/" + client.getDefaultCollection();
          }

          method = new HttpPost(basePath + "/update" + requestParams.toQueryString());

          org.apache.http.client.config.RequestConfig.Builder requestConfigBuilder =
              HttpClientUtil.createDefaultRequestConfigBuilder();
          requestConfigBuilder.setSocketTimeout(soTimeout);
          requestConfigBuilder.setConnectTimeout(connectionTimeout);

          method.setConfig(requestConfigBuilder.build());

          method.setEntity(template);
          method.addHeader("User-Agent", HttpSolrClient.USER_AGENT);
          method.addHeader("Content-Type", contentType);

          response =
              client
                  .getHttpClient()
                  .execute(method, HttpClientUtil.createNewHttpClientRequestContext());

          rspBody = response.getEntity().getContent();

          int statusCode = response.getStatusLine().getStatusCode();
          if (statusCode != HttpStatus.SC_OK) {
            StringBuilder msg = new StringBuilder();
            msg.append(response.getStatusLine().getReasonPhrase());
            msg.append("\n\n\n\n");
            msg.append("request: ").append(method.getURI());

            SolrException solrExc;
            NamedList<String> metadata = null;
            // parse out the metadata from the SolrException
            try {
              String encoding = "UTF-8"; // default
              if (response.getEntity().getContentType().getElements().length > 0) {
                NameValuePair param =
                    response
                        .getEntity()
                        .getContentType()
                        .getElements()[0]
                        .getParameterByName("charset");
                if (param != null) {
                  encoding = param.getValue();
                }
              }
              NamedList<Object> resp = client.parser.processResponse(rspBody, encoding);
              NamedList<Object> error = (NamedList<Object>) resp.get("error");
              if (error != null) {
                metadata = (NamedList<String>) error.get("metadata");
                String remoteMsg = (String) error.get("msg");
                if (remoteMsg != null) {
                  msg.append("\nRemote error message: ");
                  msg.append(remoteMsg);
                }
              }
            } catch (Exception exc) {
              // don't want to fail to report error if parsing the response fails
              log.warn("Failed to parse error response from {} due to: ", client.getBaseURL(), exc);
            } finally {
              solrExc =
                  new SolrClient.RemoteSolrException(
                      client.getBaseURL(), statusCode, msg.toString(), null);
              if (metadata != null) {
                solrExc.setMetadata(metadata);
              }
            }

            handleError(solrExc);
          } else {
            onSuccess(response);
          }
          stallDetection.incrementProcessedCount();

        } finally {
          try {
            if (response != null) {
              Utils.consumeFully(response.getEntity());
            }
          } catch (Exception e) {
            log.error("Error consuming and closing http response stream.", e);
          }
          notifyQueueAndRunnersIfEmptyQueue();
        }
      }
    }