in src/main/java/org/opensearch/search/asynchronous/service/AsynchronousSearchPersistenceService.java [227:289]
public void updateExpirationTime(String id, long expirationTimeMillis,
User user, ActionListener<AsynchronousSearchPersistenceModel> listener) {
if (indexExists() == false) {
listener.onFailure(new ResourceNotFoundException(id));
return;
}
UpdateRequest updateRequest = new UpdateRequest(ASYNC_SEARCH_RESPONSE_INDEX, id);
updateRequest.retryOnConflict(5);
if (user == null) {
Map<String, Object> source = new HashMap<>();
source.put(EXPIRATION_TIME_MILLIS, expirationTimeMillis);
updateRequest.doc(source, XContentType.JSON);
} else {
String scriptCode = "if (ctx._source.user == null || ctx._source.user.backend_roles == null || " +
"(params.backend_roles != null && params.backend_roles.containsAll(ctx._source.user.backend_roles))) " +
"{ ctx._source.expiration_time_millis = params.expiration_time_millis } else { ctx.op = 'none' }";
Map<String, Object> params = new HashMap<>();
params.put(BACKEND_ROLES, user.getBackendRoles());
params.put(EXPIRATION_TIME_MILLIS, expirationTimeMillis);
Script conditionalUpdateScript = new Script(ScriptType.INLINE, "painless", scriptCode, params);
updateRequest.script(conditionalUpdateScript);
}
updateRequest.fetchSource(FetchSourceContext.FETCH_SOURCE);
client.update(updateRequest, ActionListener.wrap(updateResponse -> {
switch (updateResponse.getResult()) {
case NOOP:
if (user != null) {
listener.onFailure(new OpenSearchSecurityException(
"User doesn't have necessary roles to access the asynchronous search with id " + id, RestStatus.FORBIDDEN));
} else {
Map<String, Object> updatedSource = updateResponse.getGetResult().getSource();
listener.onResponse(new AsynchronousSearchPersistenceModel((long) updatedSource.get(START_TIME_MILLIS),
(long) updatedSource.get(EXPIRATION_TIME_MILLIS),
(String) updatedSource.get(RESPONSE), (String) updatedSource.get(ERROR),
parseUser((Map<String, Object>) updatedSource.get(USER))));
}
break;
case UPDATED:
Map<String, Object> updatedSource = updateResponse.getGetResult().getSource();
listener.onResponse(new AsynchronousSearchPersistenceModel((long) updatedSource.get(START_TIME_MILLIS),
(long) updatedSource.get(EXPIRATION_TIME_MILLIS),
(String) updatedSource.get(RESPONSE), (String) updatedSource.get(ERROR),
parseUser((Map<String, Object>) updatedSource.get(USER))));
break;
case NOT_FOUND:
case DELETED:
logger.debug("Update Result [{}] for id [{}], expiration time requested, [{}]",
updateResponse.getResult(), id, expirationTimeMillis);
listener.onFailure(new ResourceNotFoundException(id));
break;
}
}, exception -> {
final Throwable cause = ExceptionsHelper.unwrapCause(exception);
if (cause instanceof DocumentMissingException) {
listener.onFailure(new ResourceNotFoundException(id));
} else {
logger.error(() -> new ParameterizedMessage("Exception occurred updating expiration time for asynchronous search [{}]",
id), exception);
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
}));
}