private void cancelAndFreeActiveAndPersistedContext()

in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchService.java [330:408]


    private void cancelAndFreeActiveAndPersistedContext(AsynchronousSearchActiveContext asynchronousSearchContext,
                                                        ActionListener<Boolean> listener, User user) {
        // if there are no context found to be cleaned up we throw a ResourceNotFoundException
        AtomicReference<Releasable> releasableReference = new AtomicReference<>(() -> {});
        ActionListener<Boolean> releasableListener = runAfter(listener, releasableReference.get()::close);
        GroupedActionListener<Boolean> groupedDeletionListener = new GroupedActionListener<>(
                wrap((responses) -> {
                    if (responses.stream().anyMatch(r -> r)) {
                        logger.debug("Free context for asynchronous search [{}] successful ",
                                asynchronousSearchContext.getAsynchronousSearchId());
                        releasableListener.onResponse(true);
                    } else {
                        logger.debug("Freeing context, asynchronous search [{}] not found ",
                                asynchronousSearchContext.getAsynchronousSearchId());
                        releasableListener.onFailure(new ResourceNotFoundException(asynchronousSearchContext.getAsynchronousSearchId()));
                    }
                }, releasableListener::onFailure), 2);

        //We get a true or a ResourceNotFound from persistence layer. We want to translate it to either a true/false or any other exception
        //that should be surfaced up
        ActionListener<Boolean> translatedListener = wrap(
                groupedDeletionListener::onResponse, (ex) -> {
                    if (ex instanceof ResourceNotFoundException) {
                        groupedDeletionListener.onResponse(false);
                    } else {
                        logger.debug(() -> new ParameterizedMessage("Translating exception, received for asynchronous search [{}]",
                                asynchronousSearchContext.getAsynchronousSearchId()), ex);
                        groupedDeletionListener.onFailure(ex);
                    }
                });
        String triggeredBy = user != null ? (" by user [" + user + "]") : "";
        String cancelTaskReason = "Delete asynchronous search [" + asynchronousSearchContext.getAsynchronousSearchId()
                + "] has been triggered" + triggeredBy + ". Attempting to cancel in-progress search task";
        //Intent of the lock here is to disallow ongoing migration to system index
        // as if that is underway we might end up creating a new document post a DELETE was executed
        asynchronousSearchContext.acquireContextPermitIfRequired(wrap(
                releasable -> {
                    releasableReference.set(releasable);
                    if (asynchronousSearchContext.keepOnCompletion()) {
                        handleCancelTaskPermitAcquired(asynchronousSearchContext, groupedDeletionListener, cancelTaskReason);
                        logger.debug("Deleting asynchronous search id [{}] from system index ",
                                asynchronousSearchContext.getAsynchronousSearchId());
                        persistenceService.deleteResponse(asynchronousSearchContext.getAsynchronousSearchId(), user, translatedListener);
                    } else { //keep on completion is false. simply cancel task and clean up active context
                        handleCancelTaskPermitAcquired(asynchronousSearchContext, wrap(r -> {
                            if (r) {
                                releasableListener.onResponse(true);
                            } else {
                                releasableListener.onFailure(
                                        new ResourceNotFoundException(asynchronousSearchContext.getAsynchronousSearchId()));
                            }
                        }, releasableListener::onFailure), cancelTaskReason);
                    }
                }, exception -> {
                    Throwable cause = ExceptionsHelper.unwrapCause(exception);
                    if (cause instanceof TimeoutException) {
                        // this should ideally not happen. This would mean we couldn't acquire permits within the timeout
                        logger.debug(() -> new ParameterizedMessage("Failed to acquire permits for " +
                                "asynchronous search id [{}] for updating context within timeout 5s",
                                asynchronousSearchContext.getAsynchronousSearchId()), exception);
                        listener.onFailure(new OpenSearchTimeoutException(asynchronousSearchContext.getAsynchronousSearchId()));
                    } else {
                        // best effort clean up with acknowledged as false
                        if (asynchronousSearchContext.keepOnCompletion()) {

                            handleCancelTaskPermitAcquisitionFailed(asynchronousSearchContext, groupedDeletionListener, cancelTaskReason,
                                    exception);
                            logger.debug("Deleting asynchronous search id [{}] from system index ",
                                    asynchronousSearchContext.getAsynchronousSearchId());
                            persistenceService.deleteResponse(asynchronousSearchContext.getAsynchronousSearchId(),
                                    user, translatedListener);
                        } else {
                            handleCancelTaskPermitAcquisitionFailed(asynchronousSearchContext, releasableListener, cancelTaskReason,
                                    exception);
                        }
                    }
                }
        ), TimeValue.timeValueSeconds(5), "free context");
    }