in solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java [222:420]
void sendUpdateStream() throws Exception {
while (!queue.isEmpty()) {
HttpPost method = null;
HttpResponse response = null;
InputStream rspBody = null;
try {
Update update;
notifyQueueAndRunnersIfEmptyQueue();
try {
inPoll = true;
update = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
continue;
} finally {
inPoll = false;
}
if (update == null) break;
String contentType = client.requestWriter.getUpdateContentType();
final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
final ModifiableSolrParams origParams =
new ModifiableSolrParams(update.getRequest().getParams());
final String origTargetCollection = update.getCollection();
EntityTemplate template =
new EntityTemplate(
new ContentProducer() {
@Override
public void writeTo(OutputStream out) throws IOException {
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
Update upd = update;
while (upd != null) {
UpdateRequest req = upd.getRequest();
if (!origParams.equals(req.getParams())
|| !Objects.equals(origTargetCollection, upd.getCollection())) {
// Request has different params or destination core/collection, return to
// queue
queue.add(upd);
break;
}
client.requestWriter.write(req, out);
if (isXml) {
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" />";
}
if (fmt != null) {
byte[] content =
String.format(
Locale.ROOT,
fmt,
params.getBool(UpdateParams.WAIT_SEARCHER, false) + "")
.getBytes(StandardCharsets.UTF_8);
out.write(content);
}
}
}
out.flush();
notifyQueueAndRunnersIfEmptyQueue();
inPoll = true;
try {
while (true) {
try {
upd = queue.poll(pollQueueTimeMillis, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException e) {
if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
if (!queue.isEmpty()) {
continue;
}
if (log.isDebugEnabled()) pollExits.incrementAndGet();
upd = null;
break;
} finally {
inPoll = false;
}
}
} finally {
inPoll = false;
}
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
}
});
// The parser 'wt=' and 'version=' params are used instead of the
// original params
ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
requestParams.set(CommonParams.WT, client.parser.getWriterType());
requestParams.set(CommonParams.VERSION, client.parser.getVersion());
String basePath = client.getBaseURL();
if (update.getCollection() != null) {
basePath += "/" + update.getCollection();
} else if (client.getDefaultCollection() != null) {
basePath += "/" + client.getDefaultCollection();
}
method = new HttpPost(basePath + "/update" + requestParams.toQueryString());
org.apache.http.client.config.RequestConfig.Builder requestConfigBuilder =
HttpClientUtil.createDefaultRequestConfigBuilder();
requestConfigBuilder.setSocketTimeout(soTimeout);
requestConfigBuilder.setConnectTimeout(connectionTimeout);
method.setConfig(requestConfigBuilder.build());
method.setEntity(template);
method.addHeader("User-Agent", HttpSolrClient.USER_AGENT);
method.addHeader("Content-Type", contentType);
response =
client
.getHttpClient()
.execute(method, HttpClientUtil.createNewHttpClientRequestContext());
rspBody = response.getEntity().getContent();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n\n\n");
msg.append("request: ").append(method.getURI());
SolrException solrExc;
NamedList<String> metadata = null;
// parse out the metadata from the SolrException
try {
String encoding = "UTF-8"; // default
if (response.getEntity().getContentType().getElements().length > 0) {
NameValuePair param =
response
.getEntity()
.getContentType()
.getElements()[0]
.getParameterByName("charset");
if (param != null) {
encoding = param.getValue();
}
}
NamedList<Object> resp = client.parser.processResponse(rspBody, encoding);
NamedList<Object> error = (NamedList<Object>) resp.get("error");
if (error != null) {
metadata = (NamedList<String>) error.get("metadata");
String remoteMsg = (String) error.get("msg");
if (remoteMsg != null) {
msg.append("\nRemote error message: ");
msg.append(remoteMsg);
}
}
} catch (Exception exc) {
// don't want to fail to report error if parsing the response fails
log.warn("Failed to parse error response from {} due to: ", client.getBaseURL(), exc);
} finally {
solrExc =
new SolrClient.RemoteSolrException(
client.getBaseURL(), statusCode, msg.toString(), null);
if (metadata != null) {
solrExc.setMetadata(metadata);
}
}
handleError(solrExc);
} else {
onSuccess(response);
}
stallDetection.incrementProcessedCount();
} finally {
try {
if (response != null) {
Utils.consumeFully(response.getEntity());
}
} catch (Exception e) {
log.error("Error consuming and closing http response stream.", e);
}
notifyQueueAndRunnersIfEmptyQueue();
}
}
}