in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [1935:2057]
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final String customItemType, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
String scrollIdentifier = null;
long totalHits = 0;
PartialList.Relation totalHitsRelation = PartialList.Relation.EQUAL;
try {
String itemType = Item.getItemType(clazz);
if (customItemType != null) {
itemType = customItemType;
}
TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.fetchSource(true)
.seqNoAndPrimaryTerm(true)
.query(wrapWithItemTypeQuery(itemType, query))
.size(size < 0 ? defaultQueryLimit : size)
.from(offset);
if (scrollTimeValidity != null) {
keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity");
searchRequest.scroll(keepAlive);
}
if (size == Integer.MIN_VALUE) {
searchSourceBuilder.size(defaultQueryLimit);
} else if (size != -1) {
searchSourceBuilder.size(size);
} else {
// size == -1, use scroll query to retrieve all the results
searchRequest.scroll(keepAlive);
}
if (routing != null) {
searchRequest.routing(routing);
}
if (sortBy != null) {
String[] sortByArray = sortBy.split(",");
for (String sortByElement : sortByArray) {
if (sortByElement.startsWith("geo:")) {
String[] elements = sortByElement.split(":");
GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1], Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS);
if (elements.length > 4 && elements[4].equals("desc")) {
searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.DESC));
} else {
searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.ASC));
}
} else {
String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType);
if (name != null) {
if (sortByElement.endsWith(":desc")) {
searchSourceBuilder.sort(name, SortOrder.DESC);
} else {
searchSourceBuilder.sort(name, SortOrder.ASC);
}
} else {
// in the case of no data existing for the property, we will not add the sorting to the request.
}
}
}
}
searchSourceBuilder.version(true);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
if (size == -1) {
// Scroll until no more hits are returned
while (true) {
for (SearchHit searchHit : response.getHits().getHits()) {
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
searchScrollRequest.scroll(keepAlive);
response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
} else {
SearchHits searchHits = response.getHits();
scrollIdentifier = response.getScrollId();
totalHits = searchHits.getTotalHits().value;
totalHitsRelation = getTotalHitsRelation(searchHits.getTotalHits());
if (scrollIdentifier != null && totalHits == 0) {
// we have no results, we must clear the scroll request immediately.
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex());
results.add(value);
}
}
} catch (Exception t) {
throw new Exception("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
}
PartialList<T> result = new PartialList<T>(results, offset, size, totalHits, totalHitsRelation);
if (scrollIdentifier != null && totalHits != 0) {
result.setScrollIdentifier(scrollIdentifier);
result.setScrollTimeValidity(scrollTimeValidity);
}
return result;
}
}.catchingExecuteInClassLoader(true);
}