in src/main/java/org/opensearch/knn/indices/ModelDao.java [250:327]
private void putInternal(Model model, ActionListener<IndexResponse> listener,
DocWriteRequest.OpType requestOpType) throws IOException {
if (model == null) {
throw new IllegalArgumentException("Model cannot be null");
}
ModelMetadata modelMetadata = model.getModelMetadata();
Map<String, Object> parameters = new HashMap<String, Object>() {{
put(KNNConstants.MODEL_ID, model.getModelID());
put(KNNConstants.KNN_ENGINE, modelMetadata.getKnnEngine().getName());
put(KNNConstants.METHOD_PARAMETER_SPACE_TYPE, modelMetadata.getSpaceType().getValue());
put(KNNConstants.DIMENSION, modelMetadata.getDimension());
put(KNNConstants.MODEL_STATE, modelMetadata.getState().getName());
put(KNNConstants.MODEL_TIMESTAMP, modelMetadata.getTimestamp());
put(KNNConstants.MODEL_DESCRIPTION, modelMetadata.getDescription());
put(KNNConstants.MODEL_ERROR, modelMetadata.getError());
}};
byte[] modelBlob = model.getModelBlob();
if (modelBlob == null && ModelState.CREATED.equals(modelMetadata.getState())) {
throw new IllegalArgumentException("Model binary cannot be null when model state is CREATED");
}
// Only add model if it is not null
if (modelBlob != null) {
String base64Model = Base64.getEncoder().encodeToString(modelBlob);
parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, base64Model);
}
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(MODEL_INDEX_NAME, "_doc");
indexRequestBuilder.setId(model.getModelID());
indexRequestBuilder.setSource(parameters);
indexRequestBuilder.setOpType(requestOpType); // Delegate whether this request can update based on opType
indexRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// After metadata update finishes, remove item from every node's cache if necessary. If no model id is
// passed then nothing needs to be removed from the cache
ActionListener<IndexResponse> onMetaListener;
onMetaListener = ActionListener.wrap(indexResponse -> client.execute(
RemoveModelFromCacheAction.INSTANCE,
new RemoveModelFromCacheRequest(model.getModelID()),
ActionListener.wrap(
removeModelFromCacheResponse -> {
if (!removeModelFromCacheResponse.hasFailures()) {
listener.onResponse(indexResponse);
return;
}
String failureMessage = buildRemoveModelErrorMessage(model.getModelID(),
removeModelFromCacheResponse);
listener.onFailure(new RuntimeException(failureMessage));
}, listener::onFailure
)
), listener::onFailure);
// After the model is indexed, update metadata only if the model is in CREATED state
ActionListener<IndexResponse> onIndexListener;
if (ModelState.CREATED.equals(model.getModelMetadata().getState())) {
onIndexListener = getUpdateModelMetadataListener(model.getModelMetadata(), onMetaListener);
} else {
onIndexListener = onMetaListener;
}
// Create the model index if it does not already exist
if (!isCreated()) {
create(ActionListener.wrap(createIndexResponse -> indexRequestBuilder.execute(onIndexListener),
onIndexListener::onFailure));
return;
}
indexRequestBuilder.execute(onIndexListener);
}