public final void performCleanUp()

in src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java [193:237]


    public final void performCleanUp() {
        final ThreadContext threadContext = threadPool.getThreadContext();
        try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
            // we have to execute under the system context so that if security is enabled the sync is authorized
            threadContext.markAsSystemContext();
            ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterService.state().nodes().getDataNodes();
            List<DiscoveryNode> nodes = Stream.of(dataNodes.values().toArray(DiscoveryNode.class))
                    .collect(Collectors.toList());
            if (nodes == null || nodes.isEmpty()) {
                logger.debug("Found empty data nodes with asynchronous search enabled attribute [{}] for response clean up", dataNodes);
                return;
            }
            int pos = Randomness.get().nextInt(nodes.size());
            DiscoveryNode randomNode = nodes.get(pos);
            transportService.sendRequest(randomNode, PERSISTED_RESPONSE_CLEANUP_ACTION_NAME,
                    new AsynchronousSearchCleanUpRequest(threadPool.absoluteTimeInMillis()),
                    new TransportResponseHandler<AcknowledgedResponse>() {

                        @Override
                        public AcknowledgedResponse read(StreamInput in) throws IOException {
                            return new AcknowledgedResponse(in);
                        }

                        @Override
                        public void handleResponse(AcknowledgedResponse response) {
                            logger.debug("Successfully executed clean up action on node [{}] with response [{}]", randomNode,
                                    response.isAcknowledged());
                        }

                        @Override
                        public void handleException(TransportException e) {
                            logger.error(() -> new ParameterizedMessage("Exception executing action [{}]",
                                    PERSISTED_RESPONSE_CLEANUP_ACTION_NAME), e);
                        }

                        @Override
                        public String executor() {
                            return AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME;
                        }
                    });

        } catch (Exception ex) {
            logger.error("Failed to schedule asynchronous search cleanup", ex);
        }
    }