in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchService.java [489:563]
public void updateKeepAliveAndGetContext(String id, TimeValue keepAlive, AsynchronousSearchContextId asynchronousSearchContextId,
User user, ActionListener<AsynchronousSearchContext> listener) {
ActionListener<AsynchronousSearchContext> exceptionTranslationWrapper = getExceptionTranslationWrapper(id, listener);
validateKeepAlive(keepAlive);
long requestedExpirationTime = currentTimeSupplier.getAsLong() + keepAlive.getMillis();
// find an active context on this node if one exists
Optional<AsynchronousSearchActiveContext> asynchronousSearchContextOptional = asynchronousSearchActiveStore
.getContext(asynchronousSearchContextId);
// for all other stages we don't really care much as those contexts are destined to be discarded
if (asynchronousSearchContextOptional.isPresent()) {
AsynchronousSearchActiveContext asynchronousSearchActiveContext = asynchronousSearchContextOptional.get();
asynchronousSearchActiveContext.acquireContextPermitIfRequired(wrap(
releasable -> {
ActionListener<AsynchronousSearchContext> releasableActionListener = runAfter(exceptionTranslationWrapper,
releasable::close);
// At this point it's possible that the response would have been persisted to system index
if (asynchronousSearchActiveContext.isAlive() == false && asynchronousSearchActiveContext.keepOnCompletion()) {
logger.debug("Updating persistence store after state is PERSISTED asynchronous search id [{}] " +
"for updating context", asynchronousSearchActiveContext.getAsynchronousSearchId());
persistenceService.updateExpirationTime(id, requestedExpirationTime, user, wrap(
(actionResponse) ->
releasableActionListener.onResponse(new AsynchronousSearchPersistenceContext(id,
asynchronousSearchContextId,
actionResponse, currentTimeSupplier, namedWriteableRegistry)),
releasableActionListener::onFailure));
} else {
if (isUserValid(user, asynchronousSearchActiveContext.getUser())) {
logger.debug("Updating persistence store: NO as state is NOT PERSISTED yet asynchronous search id [{}] " +
"for updating context", asynchronousSearchActiveContext.getAsynchronousSearchId());
asynchronousSearchActiveContext.setExpirationTimeMillis(requestedExpirationTime);
releasableActionListener.onResponse(asynchronousSearchActiveContext);
} else {
releasableActionListener.onFailure(
new OpenSearchSecurityException("User doesn't have necessary roles to access the " +
"asynchronous search with id " + id, RestStatus.FORBIDDEN));
}
}
},
exception -> {
Throwable cause = ExceptionsHelper.unwrapCause(exception);
if (cause instanceof TimeoutException) {
// this should ideally not happen. This would mean we couldn't acquire permits within the timeout
logger.debug(() -> new ParameterizedMessage("Failed to acquire permits for " +
"asynchronous search id [{}] for updating context within timeout 5s",
asynchronousSearchActiveContext.getAsynchronousSearchId()), exception);
listener.onFailure(new OpenSearchTimeoutException(id));
} else {
// best effort we try an update the doc if one exists
if (asynchronousSearchActiveContext.keepOnCompletion()) {
logger.debug(
"Updating persistence store after failing to acquire permits for asynchronous search id [{}] for " +
"updating context with expiration time [{}]", asynchronousSearchActiveContext
.getAsynchronousSearchId(),
requestedExpirationTime);
persistenceService.updateExpirationTime(id, requestedExpirationTime, user,
wrap((actionResponse) -> exceptionTranslationWrapper.onResponse(
new AsynchronousSearchPersistenceContext(id, asynchronousSearchContextId,
actionResponse, currentTimeSupplier, namedWriteableRegistry)),
exceptionTranslationWrapper::onFailure));
} else {
exceptionTranslationWrapper.onFailure(new ResourceNotFoundException(
asynchronousSearchActiveContext.getAsynchronousSearchId()));
}
}
}), TimeValue.timeValueSeconds(5), "update keep alive");
} else {
// try update the doc on the index assuming there exists one.
logger.debug("Updating persistence store after active context evicted for asynchronous search id [{}] " +
"for updating context", id);
persistenceService.updateExpirationTime(id, requestedExpirationTime, user,
wrap((actionResponse) -> exceptionTranslationWrapper.onResponse(new AsynchronousSearchPersistenceContext(
id, asynchronousSearchContextId, actionResponse, currentTimeSupplier, namedWriteableRegistry)),
exceptionTranslationWrapper::onFailure));
}
}