in src/main/java/org/opensearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java [104:174]
public void persistResponse(AsynchronousSearchActiveContext asynchronousSearchContext,
AsynchronousSearchPersistenceModel persistenceModel) {
// acquire all permits non-blocking
asynchronousSearchContext.acquireAllContextPermits(ActionListener.wrap(releasable -> {
// check again after acquiring permit if the context has been deleted mean while
if (asynchronousSearchContext.shouldPersist() == false) {
logger.debug(
"Async search context [{}] has been closed while waiting to acquire permits for post processing",
asynchronousSearchContext.getAsynchronousSearchId());
releasable.close();
return;
}
logger.debug("Persisting response for asynchronous search id [{}]",
asynchronousSearchContext.getAsynchronousSearchId());
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
asynchronousSearchPersistenceService.storeResponse(asynchronousSearchContext.getAsynchronousSearchId(),
persistenceModel, ActionListener.runAfter(ActionListener.wrap(
(indexResponse) -> {
//Mark any dangling reference as PERSISTED and cleaning it up from the IN_MEMORY context
logger.debug("Successfully persisted response for asynchronous search id [{}]",
asynchronousSearchContext.getAsynchronousSearchId());
try {
asynchronousSearchStateMachine.trigger(new SearchResponsePersistedEvent(
asynchronousSearchContext));
} catch (AsynchronousSearchStateMachineClosedException ex) {
// this should never happen since we had checked after acquiring the all permits so a
// concurrent delete is not expected here, however an external task cancellation
// can cause this
logger.warn("Unexpected state, possibly caused by external task cancellation," +
" context with id [{}] closed while triggering event [{}]",
asynchronousSearchContext.getAsynchronousSearchId(),
SearchResponsePersistedEvent.class.getName());
} finally {
freeActiveContextConsumer.accept(asynchronousSearchContext);
}
},
(e) -> {
try {
asynchronousSearchStateMachine.trigger(new SearchResponsePersistFailedEvent(
asynchronousSearchContext));
} catch (AsynchronousSearchStateMachineClosedException ex) {
//this should never happen since we had checked after acquiring the all permits so a
// concurrent delete is not expected here, however an external task cancellation
// can cause this
logger.warn("Unexpected state, possibly caused by external task cancellation," +
" context with id [{}] closed while triggering event [{}]",
asynchronousSearchContext.getAsynchronousSearchId(),
SearchResponsePersistFailedEvent.class.getName());
} finally {
freeActiveContextConsumer.accept(asynchronousSearchContext);
}
logger.error(() -> new ParameterizedMessage(
"Failed to persist final response for [{}] due to [{}]",
asynchronousSearchContext.getAsynchronousSearchId(), e));
}
), releasable::close));
}
}, (e) -> {
// Failure to acquire context can happen either due to a TimeoutException or AsynchronousSearchAlreadyClosedException
// If we weren't able to acquire permits we clean up the context to release heap.
Throwable cause = ExceptionsHelper.unwrapCause(e);
Level level = cause instanceof AsynchronousSearchContextClosedException || cause instanceof TimeoutException
? Level.DEBUG : Level.WARN;
logger.log(level, () -> new ParameterizedMessage("Exception occured while acquiring the permit for " +
"asynchronousSearchContext [{}]", asynchronousSearchContext.getAsynchronousSearchId()), e);
freeActiveContextConsumer.accept(asynchronousSearchContext);
}),
TimeValue.timeValueSeconds(120), "persisting response");
}