public void run()

in src/main/java/org/opensearch/knn/training/TrainingJob.java [109:202]


    public void run() {
        NativeMemoryAllocation trainingDataAllocation = null;
        NativeMemoryAllocation modelAnonymousAllocation = null;
        ModelMetadata modelMetadata = model.getModelMetadata();

        try {
            // Get training data
            trainingDataAllocation = nativeMemoryCacheManager.get(trainingDataEntryContext, false);

            // Acquire lock on allocation -- this will wait until training data is loaded
            trainingDataAllocation.readLock();
        } catch (Exception e) {
            logger.error("Failed to get training data for model \"" + modelId + "\": " + e.getMessage());
            modelMetadata.setState(ModelState.FAILED);
            modelMetadata.setError("Failed to load training data into memory. " +
                    "Check if there is enough memory to perform the request.");

            if (trainingDataAllocation != null) {
                nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());
            }

            KNNCounter.TRAINING_ERRORS.increment();

            return;
        }

        try {
            // Reserve space in the cache for the model
            modelAnonymousAllocation = nativeMemoryCacheManager.get(modelAnonymousEntryContext, false);

            // Lock until training completes
            modelAnonymousAllocation.readLock();
        } catch (Exception e) {
            logger.error("Failed to allocate space in native memory for model \"" + modelId + "\": " + e.getMessage());
            modelMetadata.setState(ModelState.FAILED);
            modelMetadata.setError("Failed to allocate space in native memory for the model. " +
                    "Check if there is enough memory to perform the request.");

            trainingDataAllocation.readUnlock();
            nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());

            if (modelAnonymousAllocation != null) {
                nativeMemoryCacheManager.invalidate(modelAnonymousEntryContext.getKey());
            }

            KNNCounter.TRAINING_ERRORS.increment();

            return;
        }

        // Once locks are acquired, train the model. We need a separate try/catch block due to the fact that the lock
        // needs to be released after they are acquired, but cannot be released if it has not been acquired.
        try {
            // We need to check if either allocation is closed before we proceed. There is a possibility that
            // immediately after the cache returns the allocation, it will grab the write lock and close them before
            // this method can get the read lock.
            if (modelAnonymousAllocation.isClosed()) {
                throw new RuntimeException("Unable to reserve memory for model: allocation is already closed");
            }

            if (trainingDataAllocation.isClosed()) {
                throw new RuntimeException("Unable to load training data into memory: allocation is already closed");
            }

            Map<String, Object> trainParameters = model.getModelMetadata().getKnnEngine().getMethodAsMap(knnMethodContext);
            trainParameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(
                    KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));

            byte[] modelBlob = JNIService.trainIndex(
                    trainParameters,
                    model.getModelMetadata().getDimension(),
                    trainingDataAllocation.getMemoryAddress(),
                    model.getModelMetadata().getKnnEngine().getName()
            );

            // Once training finishes, update model
            model.setModelBlob(modelBlob);
            modelMetadata.setState(ModelState.CREATED);
        } catch (Exception e) {
            logger.error("Failed to run training job for model \"" + modelId + "\": " + e.getMessage());
            modelMetadata.setState(ModelState.FAILED);
            modelMetadata.setError("Failed to execute training. May be caused by an invalid method definition or " +
                    "not enough memory to perform training.");

            KNNCounter.TRAINING_ERRORS.increment();

        } finally {
            // Invalidate right away so we dont run into any big memory problems
            trainingDataAllocation.readUnlock();
            modelAnonymousAllocation.readUnlock();
            nativeMemoryCacheManager.invalidate(trainingDataEntryContext.getKey());
            nativeMemoryCacheManager.invalidate(modelAnonymousEntryContext.getKey());
        }
    }