in plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java [229:261]
public void updateMLTask(
String taskId,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener,
long timeoutInMillis
) {
if (!taskCaches.containsKey(taskId)) {
listener.onFailure(new RuntimeException("Can't find task"));
return;
}
Semaphore semaphore = taskCaches.get(taskId).getUpdateTaskIndexSemaphore();
try {
if (semaphore != null && !semaphore.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS)) {
listener.onFailure(new RuntimeException("Other updating request not finished yet"));
return;
}
} catch (InterruptedException e) {
log.error("Failed to acquire semaphore for task " + taskId, e);
listener.onFailure(e);
}
if (updatedFields == null || updatedFields.size() == 0) {
listener.onFailure(new IllegalArgumentException("Updated fields is null or empty"));
return;
}
UpdateRequest updateRequest = new UpdateRequest(ML_TASK_INDEX, taskId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
updatedContent.put(LAST_UPDATE_TIME_FIELD, Instant.now().toEpochMilli());
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionListener actionListener = semaphore == null ? listener : ActionListener.runAfter(listener, () -> semaphore.release());
client.update(updateRequest, actionListener);
}