in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchPersistenceService.java [297:326]
public void deleteExpiredResponses(ActionListener<AcknowledgedResponse> listener, long expirationTimeInMillis) {
if (indexExists() == false) {
logger.debug("Async search index not yet created! Nothing to delete.");
listener.onResponse(new AcknowledgedResponse(true));
} else {
DeleteByQueryRequest request = new DeleteByQueryRequest(ASYNC_SEARCH_RESPONSE_INDEX)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_MILLIS).lte(expirationTimeInMillis));
client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
deleteResponse -> {
if ((deleteResponse.getBulkFailures() != null && deleteResponse.getBulkFailures().size() > 0) ||
(deleteResponse.getSearchFailures() != null && deleteResponse.getSearchFailures().size() > 0)) {
logger.error("Failed to delete expired asynchronous search responses with bulk failures[{}] / search " +
"failures [{}]", deleteResponse.getBulkFailures(), deleteResponse.getSearchFailures());
listener.onResponse(new AcknowledgedResponse(false));
} else {
logger.debug("Successfully deleted expired responses");
listener.onResponse(new AcknowledgedResponse(true));
}
},
(e) -> {
logger.error(() -> new ParameterizedMessage("Failed to delete expired response for expiration time {}",
expirationTimeInMillis), e);
final Throwable cause = ExceptionsHelper.unwrapCause(e);
listener.onFailure(cause instanceof Exception ? (Exception) cause :
new NotSerializableExceptionWrapper(cause));
})
);
}
}