in connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudSolrClient.java [742:914]
protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest<?> request, final int retryCount, final List<String> inputCollections) throws SolrServerException, IOException {
connect(); // important to call this before you start working with the ZkStateReader
// build up a _stateVer_ param to pass to the server containing all of the
// external collection state versions involved in this request, which allows
// the server to notify us that our cached state for one or more of the external
// collections is stale and needs to be refreshed ... this code has no impact on internal
// collections
String stateVerParam = null;
List<DocCollection> requestedCollections = null;
boolean isCollectionRequestOfV2 = false;
if (request instanceof V2RequestSupport) {
request = ((V2RequestSupport) request).getV2Request();
}
if (request instanceof V2Request) {
isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
}
final boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
final boolean isUpdate = (request instanceof IsUpdateRequest) && (request instanceof ModifiedUpdateRequest);
if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
final Set<String> requestedCollectionNames = resolveAliases(inputCollections, isUpdate);
StringBuilder stateVerParamBuilder = null;
for (final String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
final DocCollection coll = getDocCollection(requestedCollection, null);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
}
final int collVer = coll.getZNodeVersion();
if (requestedCollections == null)
requestedCollections = new ArrayList<>(requestedCollectionNames.size());
requestedCollections.add(coll);
if (stateVerParamBuilder == null) {
stateVerParamBuilder = new StringBuilder();
} else {
stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
}
stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
}
if (stateVerParamBuilder != null) {
stateVerParam = stateVerParamBuilder.toString();
}
}
if (request.getParams() instanceof ModifiableSolrParams) {
final ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
if (stateVerParam != null) {
params.set(STATE_VERSION, stateVerParam);
} else {
params.remove(STATE_VERSION);
}
} // else: ??? how to set this ???
NamedList<Object> resp = null;
try {
resp = sendRequest(request, inputCollections);
// to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from
// there
final Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
if (o != null && o instanceof Map) {
// remove this because no one else needs this and tests would fail if they are comparing
// responses
resp.remove(resp.size() - 1);
final Map<?, ?> invalidStates = (Map<?, ?>) o;
for (final Map.Entry<?, ?> e : invalidStates.entrySet()) {
getDocCollection((String) e.getKey(), (Integer) e.getValue());
}
}
} catch (final Exception exc) {
final Throwable rootCause = SolrException.getRootCause(exc);
// don't do retry support for admin requests
// or if the request doesn't have a collection specified
// or request is v2 api and its method is not GET
if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
if (exc instanceof SolrServerException) {
throw (SolrServerException) exc;
} else if (exc instanceof IOException) {
throw (IOException) exc;
} else if (exc instanceof RuntimeException) {
throw (RuntimeException) exc;
} else {
throw new SolrServerException(rootCause);
}
}
final int errorCode = (rootCause instanceof SolrException) ? ((SolrException) rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
final boolean wasCommError = (rootCause instanceof ConnectException || rootCause instanceof SocketException || wasCommError(rootCause));
if (wasCommError || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
// TODO there are other reasons for 404. We need to change the solr response format from HTML
// to structured data to know that
) {
// it was a communication error. it is likely that
// the node to which the request to be sent is down . So , expire the state
// so that the next attempt would fetch the fresh state
// just re-read state for all of them, if it has not been retried
// in retryExpiryTime time
if (requestedCollections != null) {
for (final DocCollection ext : requestedCollections) {
final ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
if (cacheEntry == null)
continue;
cacheEntry.maybeStale = true;
}
}
if (retryCount < MAX_STALE_RETRIES) { // if it is a communication error , we must try again
// may be, we have a stale version of the collection state
// and we could not get any information from the server
// it is probably not worth trying again and again because
// the state would not have been updated
log.info("Request to collection {} failed due to ({}) {}, retry={} maxRetries={} commError={} errorCode={} - retrying", inputCollections, errorCode, rootCause, retryCount, MAX_STALE_RETRIES,
wasCommError, errorCode);
return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
}
} else {
log.info("request was not communication error it seems");
}
log.info("Request to collection {} failed due to ({}) {}, retry={} maxRetries={} commError={} errorCode={} ", inputCollections, errorCode, rootCause, retryCount, MAX_STALE_RETRIES, wasCommError,
errorCode);
boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES && requestedCollections != null && !requestedCollections.isEmpty()
&& (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404)) {
// cached state for one or more external collections was stale
// re-issue request using updated state
stateWasStale = true;
// just re-read state for all of them, which is a little heavy handed but hopefully a rare
// occurrence
for (final DocCollection ext : requestedCollections) {
collectionStateCache.remove(ext.getName());
}
}
// if we experienced a communication error, it's worth checking the state
// with ZK just to make sure the node we're trying to hit is still part of the collection
if (retryCount < MAX_STALE_RETRIES && !stateWasStale && requestedCollections != null && !requestedCollections.isEmpty() && wasCommError) {
for (final DocCollection ext : requestedCollections) {
final DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
// we just pulled state from ZK, so update the cache so that the retry uses it
collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
}
}
}
if (requestedCollections != null) {
requestedCollections.clear(); // done with this
}
// if the state was stale, then we retry the request once with new state pulled from Zk
if (stateWasStale) {
log.warn("Re-trying request to collection(s) {} after stale state error from server.", inputCollections);
resp = requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
} else {
if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
throw exc;
} else {
throw new SolrServerException(rootCause);
}
}
}
return resp;
}