in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchService.java [330:408]
private void cancelAndFreeActiveAndPersistedContext(AsynchronousSearchActiveContext asynchronousSearchContext,
ActionListener<Boolean> listener, User user) {
// if there are no context found to be cleaned up we throw a ResourceNotFoundException
AtomicReference<Releasable> releasableReference = new AtomicReference<>(() -> {});
ActionListener<Boolean> releasableListener = runAfter(listener, releasableReference.get()::close);
GroupedActionListener<Boolean> groupedDeletionListener = new GroupedActionListener<>(
wrap((responses) -> {
if (responses.stream().anyMatch(r -> r)) {
logger.debug("Free context for asynchronous search [{}] successful ",
asynchronousSearchContext.getAsynchronousSearchId());
releasableListener.onResponse(true);
} else {
logger.debug("Freeing context, asynchronous search [{}] not found ",
asynchronousSearchContext.getAsynchronousSearchId());
releasableListener.onFailure(new ResourceNotFoundException(asynchronousSearchContext.getAsynchronousSearchId()));
}
}, releasableListener::onFailure), 2);
//We get a true or a ResourceNotFound from persistence layer. We want to translate it to either a true/false or any other exception
//that should be surfaced up
ActionListener<Boolean> translatedListener = wrap(
groupedDeletionListener::onResponse, (ex) -> {
if (ex instanceof ResourceNotFoundException) {
groupedDeletionListener.onResponse(false);
} else {
logger.debug(() -> new ParameterizedMessage("Translating exception, received for asynchronous search [{}]",
asynchronousSearchContext.getAsynchronousSearchId()), ex);
groupedDeletionListener.onFailure(ex);
}
});
String triggeredBy = user != null ? (" by user [" + user + "]") : "";
String cancelTaskReason = "Delete asynchronous search [" + asynchronousSearchContext.getAsynchronousSearchId()
+ "] has been triggered" + triggeredBy + ". Attempting to cancel in-progress search task";
//Intent of the lock here is to disallow ongoing migration to system index
// as if that is underway we might end up creating a new document post a DELETE was executed
asynchronousSearchContext.acquireContextPermitIfRequired(wrap(
releasable -> {
releasableReference.set(releasable);
if (asynchronousSearchContext.keepOnCompletion()) {
handleCancelTaskPermitAcquired(asynchronousSearchContext, groupedDeletionListener, cancelTaskReason);
logger.debug("Deleting asynchronous search id [{}] from system index ",
asynchronousSearchContext.getAsynchronousSearchId());
persistenceService.deleteResponse(asynchronousSearchContext.getAsynchronousSearchId(), user, translatedListener);
} else { //keep on completion is false. simply cancel task and clean up active context
handleCancelTaskPermitAcquired(asynchronousSearchContext, wrap(r -> {
if (r) {
releasableListener.onResponse(true);
} else {
releasableListener.onFailure(
new ResourceNotFoundException(asynchronousSearchContext.getAsynchronousSearchId()));
}
}, releasableListener::onFailure), cancelTaskReason);
}
}, exception -> {
Throwable cause = ExceptionsHelper.unwrapCause(exception);
if (cause instanceof TimeoutException) {
// this should ideally not happen. This would mean we couldn't acquire permits within the timeout
logger.debug(() -> new ParameterizedMessage("Failed to acquire permits for " +
"asynchronous search id [{}] for updating context within timeout 5s",
asynchronousSearchContext.getAsynchronousSearchId()), exception);
listener.onFailure(new OpenSearchTimeoutException(asynchronousSearchContext.getAsynchronousSearchId()));
} else {
// best effort clean up with acknowledged as false
if (asynchronousSearchContext.keepOnCompletion()) {
handleCancelTaskPermitAcquisitionFailed(asynchronousSearchContext, groupedDeletionListener, cancelTaskReason,
exception);
logger.debug("Deleting asynchronous search id [{}] from system index ",
asynchronousSearchContext.getAsynchronousSearchId());
persistenceService.deleteResponse(asynchronousSearchContext.getAsynchronousSearchId(),
user, translatedListener);
} else {
handleCancelTaskPermitAcquisitionFailed(asynchronousSearchContext, releasableListener, cancelTaskReason,
exception);
}
}
}
), TimeValue.timeValueSeconds(5), "free context");
}