in src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java [193:237]
public final void performCleanUp() {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterService.state().nodes().getDataNodes();
List<DiscoveryNode> nodes = Stream.of(dataNodes.values().toArray(DiscoveryNode.class))
.collect(Collectors.toList());
if (nodes == null || nodes.isEmpty()) {
logger.debug("Found empty data nodes with asynchronous search enabled attribute [{}] for response clean up", dataNodes);
return;
}
int pos = Randomness.get().nextInt(nodes.size());
DiscoveryNode randomNode = nodes.get(pos);
transportService.sendRequest(randomNode, PERSISTED_RESPONSE_CLEANUP_ACTION_NAME,
new AsynchronousSearchCleanUpRequest(threadPool.absoluteTimeInMillis()),
new TransportResponseHandler<AcknowledgedResponse>() {
@Override
public AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
public void handleResponse(AcknowledgedResponse response) {
logger.debug("Successfully executed clean up action on node [{}] with response [{}]", randomNode,
response.isAcknowledged());
}
@Override
public void handleException(TransportException e) {
logger.error(() -> new ParameterizedMessage("Exception executing action [{}]",
PERSISTED_RESPONSE_CLEANUP_ACTION_NAME), e);
}
@Override
public String executor() {
return AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME;
}
});
} catch (Exception ex) {
logger.error("Failed to schedule asynchronous search cleanup", ex);
}
}