public void updateKeepAliveAndGetContext()

in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchService.java [489:563]


    public void updateKeepAliveAndGetContext(String id, TimeValue keepAlive, AsynchronousSearchContextId asynchronousSearchContextId,
                                             User user, ActionListener<AsynchronousSearchContext> listener) {
        ActionListener<AsynchronousSearchContext> exceptionTranslationWrapper = getExceptionTranslationWrapper(id, listener);
        validateKeepAlive(keepAlive);
        long requestedExpirationTime = currentTimeSupplier.getAsLong() + keepAlive.getMillis();
        // find an active context on this node if one exists
        Optional<AsynchronousSearchActiveContext> asynchronousSearchContextOptional = asynchronousSearchActiveStore
                .getContext(asynchronousSearchContextId);
        // for all other stages we don't really care much as those contexts are destined to be discarded
        if (asynchronousSearchContextOptional.isPresent()) {
            AsynchronousSearchActiveContext asynchronousSearchActiveContext = asynchronousSearchContextOptional.get();
            asynchronousSearchActiveContext.acquireContextPermitIfRequired(wrap(
                    releasable -> {
                        ActionListener<AsynchronousSearchContext> releasableActionListener = runAfter(exceptionTranslationWrapper,
                                releasable::close);
                        // At this point it's possible that the response would have been persisted to system index
                        if (asynchronousSearchActiveContext.isAlive() == false && asynchronousSearchActiveContext.keepOnCompletion()) {
                            logger.debug("Updating persistence store after state is PERSISTED asynchronous search id [{}] " +
                                    "for updating context", asynchronousSearchActiveContext.getAsynchronousSearchId());
                            persistenceService.updateExpirationTime(id, requestedExpirationTime, user, wrap(
                                    (actionResponse) ->
                                            releasableActionListener.onResponse(new AsynchronousSearchPersistenceContext(id,
                                                    asynchronousSearchContextId,
                                                    actionResponse, currentTimeSupplier, namedWriteableRegistry)),
                                    releasableActionListener::onFailure));
                        } else {
                            if (isUserValid(user, asynchronousSearchActiveContext.getUser())) {
                                logger.debug("Updating persistence store: NO as state is NOT PERSISTED yet asynchronous search id [{}] " +
                                        "for updating context", asynchronousSearchActiveContext.getAsynchronousSearchId());
                                asynchronousSearchActiveContext.setExpirationTimeMillis(requestedExpirationTime);
                                releasableActionListener.onResponse(asynchronousSearchActiveContext);
                            } else {
                                releasableActionListener.onFailure(
                                        new OpenSearchSecurityException("User doesn't have necessary roles to access the " +
                                                "asynchronous search with id " + id, RestStatus.FORBIDDEN));
                            }
                        }
                    },
                    exception -> {
                        Throwable cause = ExceptionsHelper.unwrapCause(exception);
                        if (cause instanceof TimeoutException) {
                            // this should ideally not happen. This would mean we couldn't acquire permits within the timeout
                            logger.debug(() -> new ParameterizedMessage("Failed to acquire permits for " +
                                    "asynchronous search id [{}] for updating context within timeout 5s",
                                    asynchronousSearchActiveContext.getAsynchronousSearchId()), exception);
                            listener.onFailure(new OpenSearchTimeoutException(id));
                        } else {
                            // best effort we try an update the doc if one exists
                            if (asynchronousSearchActiveContext.keepOnCompletion()) {
                                logger.debug(
                                        "Updating persistence store after failing to acquire permits for asynchronous search id [{}] for " +
                                                "updating context with expiration time [{}]", asynchronousSearchActiveContext
                                                .getAsynchronousSearchId(),
                                        requestedExpirationTime);
                                persistenceService.updateExpirationTime(id, requestedExpirationTime, user,
                                        wrap((actionResponse) -> exceptionTranslationWrapper.onResponse(
                                                new AsynchronousSearchPersistenceContext(id, asynchronousSearchContextId,
                                                        actionResponse, currentTimeSupplier, namedWriteableRegistry)),
                                                exceptionTranslationWrapper::onFailure));
                            } else {
                                exceptionTranslationWrapper.onFailure(new ResourceNotFoundException(
                                        asynchronousSearchActiveContext.getAsynchronousSearchId()));
                            }
                        }
                    }), TimeValue.timeValueSeconds(5), "update keep alive");
        } else {
            // try update the doc on the index assuming there exists one.
            logger.debug("Updating persistence store after active context evicted for asynchronous search id [{}] " +
                    "for updating context", id);
            persistenceService.updateExpirationTime(id, requestedExpirationTime, user,
                    wrap((actionResponse) -> exceptionTranslationWrapper.onResponse(new AsynchronousSearchPersistenceContext(
                                    id, asynchronousSearchContextId, actionResponse, currentTimeSupplier, namedWriteableRegistry)),
                            exceptionTranslationWrapper::onFailure));
        }
    }