in src/main/java/org/opensearch/knn/training/TrainingJobRunner.java [123:171]
private void train(TrainingJob trainingJob) {
// Attempt to submit job to training thread pool. On failure, release the resources and serialize the failure.
// Listener for update model after training index action
ActionListener<IndexResponse> loggingListener = ActionListener.wrap(
indexResponse -> logger.debug("[KNN] Model serialization update for \"" +
trainingJob.getModelId() + "\" was successful"),
e -> {
logger.error("[KNN] Model serialization update for \"" + trainingJob.getModelId() +
"\" failed: " + e.getMessage());
KNNCounter.TRAINING_ERRORS.increment();
}
);
try {
threadPool.executor(TRAIN_THREAD_POOL).execute(() -> {
try {
trainingJob.run();
serializeModel(trainingJob, loggingListener, true);
} catch (IOException e) {
logger.error("Unable to serialize model \"" + trainingJob.getModelId() + "\": " + e.getMessage());
KNNCounter.TRAINING_ERRORS.increment();
} catch (Exception e) {
logger.error("Unable to complete training for \"" + trainingJob.getModelId() + "\": "
+ e.getMessage());
KNNCounter.TRAINING_ERRORS.increment();
} finally {
jobCount.decrementAndGet();
semaphore.release();
}
});
} catch (RejectedExecutionException ree) {
logger.error("Unable to train model \"" + trainingJob.getModelId() + "\": " + ree.getMessage());
ModelMetadata modelMetadata = trainingJob.getModel().getModelMetadata();
modelMetadata.setState(ModelState.FAILED);
modelMetadata.setError("Training job execution was rejected. Node's training queue is at capacity.");
try {
serializeModel(trainingJob, loggingListener, true);
} catch (IOException ioe) {
logger.error("Unable to serialize the failure for model \"" + trainingJob.getModelId() + "\": " + ioe);
} finally {
jobCount.decrementAndGet();
semaphore.release();
KNNCounter.TRAINING_ERRORS.increment();
}
}
}