in src/main/java/org/opensearch/knn/training/TrainingJob.java [109:202]
public void run() {
NativeMemoryAllocation trainingDataAllocation = null;
NativeMemoryAllocation modelAnonymousAllocation = null;
ModelMetadata modelMetadata = model.getModelMetadata();
try {
// Get training data
trainingDataAllocation = nativeMemoryCacheManager.get(trainingDataEntryContext, false);
// Acquire lock on allocation -- this will wait until training data is loaded
trainingDataAllocation.readLock();
} catch (Exception e) {
logger.error("Failed to get training data for model \"" + modelId + "\": " + e.getMessage());
modelMetadata.setState(ModelState.FAILED);
modelMetadata.setError("Failed to load training data into memory. " +
"Check if there is enough memory to perform the request.");
if (trainingDataAllocation != null) {
nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());
}
KNNCounter.TRAINING_ERRORS.increment();
return;
}
try {
// Reserve space in the cache for the model
modelAnonymousAllocation = nativeMemoryCacheManager.get(modelAnonymousEntryContext, false);
// Lock until training completes
modelAnonymousAllocation.readLock();
} catch (Exception e) {
logger.error("Failed to allocate space in native memory for model \"" + modelId + "\": " + e.getMessage());
modelMetadata.setState(ModelState.FAILED);
modelMetadata.setError("Failed to allocate space in native memory for the model. " +
"Check if there is enough memory to perform the request.");
trainingDataAllocation.readUnlock();
nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());
if (modelAnonymousAllocation != null) {
nativeMemoryCacheManager.invalidate(modelAnonymousEntryContext.getKey());
}
KNNCounter.TRAINING_ERRORS.increment();
return;
}
// Once locks are acquired, train the model. We need a separate try/catch block due to the fact that the lock
// needs to be released after they are acquired, but cannot be released if it has not been acquired.
try {
// We need to check if either allocation is closed before we proceed. There is a possibility that
// immediately after the cache returns the allocation, it will grab the write lock and close them before
// this method can get the read lock.
if (modelAnonymousAllocation.isClosed()) {
throw new RuntimeException("Unable to reserve memory for model: allocation is already closed");
}
if (trainingDataAllocation.isClosed()) {
throw new RuntimeException("Unable to load training data into memory: allocation is already closed");
}
Map<String, Object> trainParameters = model.getModelMetadata().getKnnEngine().getMethodAsMap(knnMethodContext);
trainParameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(
KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
byte[] modelBlob = JNIService.trainIndex(
trainParameters,
model.getModelMetadata().getDimension(),
trainingDataAllocation.getMemoryAddress(),
model.getModelMetadata().getKnnEngine().getName()
);
// Once training finishes, update model
model.setModelBlob(modelBlob);
modelMetadata.setState(ModelState.CREATED);
} catch (Exception e) {
logger.error("Failed to run training job for model \"" + modelId + "\": " + e.getMessage());
modelMetadata.setState(ModelState.FAILED);
modelMetadata.setError("Failed to execute training. May be caused by an invalid method definition or " +
"not enough memory to perform training.");
KNNCounter.TRAINING_ERRORS.increment();
} finally {
// Invalidate right away so we dont run into any big memory problems
trainingDataAllocation.readUnlock();
modelAnonymousAllocation.readUnlock();
nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());
nativeMemoryCacheManager.invalidate(modelAnonymousEntryContext.getKey());
}
}