in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [920:989]
public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
protected Boolean execute(Object... args) throws Exception {
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
if (item instanceof CustomItem) {
itemType = ((CustomItem) item).getCustomItemType();
}
String documentId = getDocumentIDForItemType(item.getItemId(), itemType);
String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(documentId);
indexRequest.source(source, XContentType.JSON);
if (!alwaysOverwrite) {
Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
if (seqNo != null && primaryTerm != null) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE);
}
}
if (routingByType.containsKey(itemType)) {
indexRequest.routing(routingByType.get(itemType));
}
try {
if (bulkProcessor == null || !useBatching) {
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
String responseIndex = response.getIndex();
String itemId = response.getId();
setMetadata(item, itemId, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), responseIndex);
// Special handling for session, in case of new session we check that a rollover happen or not to update the latest available index
if (Session.ITEM_TYPE.equals(itemType) &&
sessionLatestIndex != null &&
response.getResult().equals(DocWriteResponse.Result.CREATED) &&
!responseIndex.equals(sessionLatestIndex)) {
sessionLatestIndex = responseIndex;
}
} else {
bulkProcessor.add(indexRequest);
}
logMetadataItemOperation("saved", item);
} catch (IndexNotFoundException e) {
LOGGER.error("Could not find index {}, could not register item type {} with id {} ", index, itemType, item.getItemId(), e);
return false;
}
return true;
} catch (IOException e) {
throw new Exception("Error saving item " + item, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}