in jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java [461:759]
public void prepareNextBatchExec(boolean bypassCacheOnFirstInput) {
freeResources();
Batch<Integer, PartitionRequest<Binding>> backendRequests = BatchImpl.forInteger();
Estimate<Long> serviceDescription = resultSizeCache.getLimit(targetService);
long resultSetLimit = serviceDescription.getValue();
boolean isExact = serviceDescription.isExact(); // we interpret the limit as a lower bound if exact is false!
// TODO If the result set limit is known then restrict the iterators to it
int nextAllocOutputId = 0;
int batchSize = inputs.size();
if (logger.isInfoEnabled()) {
logger.info("Schedule for current batch:");
}
int rangeId = currentRangeId;
for (int inputId = currentInputId; inputId < batchSize; ++inputId) {
boolean isFirstInput = inputId == currentInputId;
long displacement = isFirstInput && !bypassCacheOnFirstInput
? currentInputIdBindingsServed
: 0l
;
Binding inputBinding = inputs.get(inputId);
// Binding joinBinding = new BindingProject(joinVarMap.keySet(), inputBinding);
Slice<Binding[]> slice = null;
Lock lock = null;
RefFuture<ServiceCacheValue> cacheValueRef = null;
if (cache != null) {
ServiceCacheKey cacheKey = cacheKeyFactory.createCacheKey(inputBinding);
// ServiceCacheKey cacheKey = new ServiceCacheKey(targetService, serviceInfo.getRawQueryOp(), joinBinding, useLoopJoin);
// System.out.println("Lookup with cache key " + cacheKey);
// Note: cacheValueRef must be closed as part of the iterators that read from the cache
cacheValueRef = cache.getCache().claim(cacheKey);
ServiceCacheValue serviceCacheValue = cacheValueRef.await();
// Lock an existing cache entry so we can read out the loaded ranges
slice = serviceCacheValue.getSlice();
if (CacheMode.CLEAR.equals(cacheMode)) {
slice.clear();
}
lock = slice.getReadWriteLock().readLock();
if (logger.isDebugEnabled()) {
logger.debug("Created cache key: " + cacheKey);
}
// Log.debug(BatchRequestIterator.class, "Cached ranges: " + slice.getLoadedRanges().toString());
lock.lock();
}
RangeSet<Long> loadedRanges;
long knownSize;
try {
if (slice != null) {
loadedRanges = slice.getLoadedRanges();
knownSize = slice.getKnownSize();
} else {
loadedRanges = TreeRangeSet.create();
knownSize = -1;
}
// Iterate the present/absent ranges
long start = serviceInfo.getOffset();
if (start == Query.NOLIMIT) {
start = 0;
}
long baseLimit = serviceInfo.getLimit();
if (baseLimit < 0) {
baseLimit = Long.MAX_VALUE;
}
long limit = baseLimit;
if (isExact && baseLimit >= 0) {
limit = Math.min(limit, resultSetLimit);
}
if (displacement != 0) {
start += displacement;
if (limit != Long.MAX_VALUE) {
limit -= displacement;
}
}
long max = knownSize < 0 ? Long.MAX_VALUE : knownSize;
long end = limit == Long.MAX_VALUE ? max : LongMath.saturatedAdd(start, limit);
end = Math.min(end, max);
Range<Long> requestedRange = end == Long.MAX_VALUE
? Range.atLeast(start)
: Range.closedOpen(start, end);
RangeMap<Long, Boolean> allRanges = TreeRangeMap.create();
if (bypassCacheOnFirstInput && isFirstInput) {
allRanges.put(requestedRange, false);
// Note: If we bypass the cache we need to skip the bindings already served
// based on 'currentInputIdBindingsServed'
} else {
RangeSet<Long> presentRanges = loadedRanges.subRangeSet(requestedRange);
RangeSet<Long> absentRanges = loadedRanges.complement().subRangeSet(requestedRange);
presentRanges.asRanges().forEach(r -> allRanges.put(r, true));
absentRanges.asRanges().forEach(r -> allRanges.put(r, false));
}
// If the beginning of the request range is covered by a cache then serve from it
// a global limit may prevent having to fire a backend request
// However, as soon as we have to fire a backend request we need to ensure we don't serve
// more data then the seen result set limit
// If the size of the data can be greater than that limit then:
// - We need to start the backend request from the request offset
// - The issue is how to handle the next binding
if (logger.isInfoEnabled()) {
logger.info("input " + inputId + ": " +
allRanges.toString()
.replace("false", "fetch")
.replace("true", "cached"));
}
Map<Range<Long>, Boolean> mapOfRanges = allRanges.asMapOfRanges();
if (mapOfRanges.isEmpty()) {
// Special case if it is known that there are no bindings:
// Register an empty iterator for (inputId, rangeId)
// Release the reference to the cache entry immediately
if (cacheValueRef != null) {
cacheValueRef.close();
}
SliceKey sliceKey = new SliceKey(inputId, rangeId);
QueryIterPeek it = QueryIterPeek.create(new QueryIterNullIterator(execCxt), execCxt);
sliceKeyToIter.put(sliceKey, it);
sliceKeyToClose.add(sliceKey);
inputToRangeToOutput.put(inputId, rangeId, nextAllocOutputId);
outputToSliceKey.put(nextAllocOutputId, sliceKey);
++rangeId;
++nextAllocOutputId;
} else {
Iterator<Entry<Range<Long>, Boolean>> rangeIt = mapOfRanges.entrySet().iterator();
RefFuture<ServiceCacheValue> finalCacheValueRef = cacheValueRef;
boolean usesCacheRead = false;
while (rangeIt.hasNext()) {
SliceKey sliceKey = new SliceKey(inputId, rangeId);
Entry<Range<Long>, Boolean> f = rangeIt.next();
Range<Long> range = f.getKey();
boolean isLoaded = f.getValue();
long lo = range.lowerEndpoint();
long hi = range.hasUpperBound() ? range.upperEndpoint() : Long.MAX_VALUE;
long lim = hi == Long.MAX_VALUE ? Long.MAX_VALUE : hi - lo;
if (isLoaded) {
usesCacheRead = true;
// Accessor will be closed via channel below
SliceAccessor<Binding[]> accessor = slice.newSliceAccessor();
// Prevent eviction of the scheduled range
accessor.addEvictionGuard(Range.closedOpen(lo, hi));
// Create a channel over the accessor for sequential reading
// Reading from the channel internally advances the range of data claimed by the accessor
// Note: As an improvement the eviction guard could be managed by the channel so that data is immediately released after read.
// Channel will be closed via baseIt below
ReadableChannel<Binding[]> channel =
new ReadableChannelWithLimit<>(
new ReadableChannelOverSliceAccessor<>(accessor, lo),
lim);
// CloseableIterator<Binding> baseIt = ReadableChannels.newIterator(channel);
IteratorCloseable<Binding> baseIt = new IteratorOverReadableChannel<>(channel.getArrayOps(), channel, 1024 * 4);
// The last iterator's close method also unclaims the cache entry
Runnable cacheEntryCloseAction = rangeIt.hasNext() || finalCacheValueRef == null
? baseIt::close
: () -> {
baseIt.close();
finalCacheValueRef.close();
};
// Bridge the cache iterator to jena
QueryIterator qIterA = QueryIterPlainWrapper.create(Iter.onClose(baseIt, cacheEntryCloseAction), execCxt);
Map<Var, Var> normedToScoped = serviceInfo.getVisibleSubOpVarsNormedToScoped();
qIterA = new QueryIteratorMapped(qIterA, normedToScoped);
// Add a pseudo idxVar mapping
final long idxVarValue = nextAllocOutputId;
QueryIterConvert qIterB = new QueryIterConvert(qIterA, b ->
BindingFactory.binding(b, idxVar, NodeFactoryExtra.intToNode(idxVarValue)), execCxt);
QueryIterPeek it = QueryIterPeek.create(qIterB, execCxt);
sliceKeyToIter.put(sliceKey, it);
sliceKeyToClose.add(sliceKey);
} else {
PartitionRequest<Binding> request = new PartitionRequest<>(nextAllocOutputId, inputBinding, lo, lim);
backendRequests.put(nextAllocOutputId, request);
sliceKeysForBackend.add(sliceKey);
}
inputToRangeToOutput.put(inputId, rangeId, nextAllocOutputId);
outputToSliceKey.put(nextAllocOutputId, sliceKey);
++rangeId;
++nextAllocOutputId;
}
// Close the reference to the cache entry; QueryIterCaching will manage
// claim/unclaim in batches so we don't need to leave the reference open here
if (!usesCacheRead && finalCacheValueRef != null) {
finalCacheValueRef.close();
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
rangeId = 0;
}
// Create *deferred* a remote execution if needed
// A limit on the query may cause the deferred execution to never run
if (!backendRequests.isEmpty()) {
BatchQueryRewriteResult rewrite = batchQueryRewriter.rewrite(backendRequests);
// System.out.println(rewrite);
Op newSubOp = rewrite.getOp();
OpService substitutedOp = new OpService(targetService, newSubOp, serviceInfo.getOpService().getSilent());
// Execute the batch request and wrap it such that ...
// (1) we can merge it with other backend and cache requests in the right order
// (2) responses are written to the cache
Supplier<QueryIterator> qIterSupplier = () -> {
QueryIterator r = opExecutor.exec(substitutedOp);
return r;
};
QueryIterator qIter = new QueryIterDefer(qIterSupplier);
// Wrap the iterator such that the items are cached
if (cache != null) {
qIter = new QueryIterWrapperCache(qIter, cacheBulkSize, cache, cacheKeyFactory, backendRequests, idxVar, targetService);
}
// Apply renaming after cache to avoid mismatch between op and bindings
qIter = QueryIter.map(qIter, rewrite.getRenames());
// Wrap the iterator further to detect resultset limit condition
// Wrap the query iter such that we can peek the next binding in order
// to decide from which iterator to take the next element
SubIterator<Binding, QueryIterator> backendItPrimary = IteratorFactoryWithBuffer.wrap(qIter);
IteratorOnClose<Binding> jenaIt = Iter.onClose(backendItPrimary, qIter::close);
QueryIterator iter = QueryIterPlainWrapper.create(jenaIt, execCxt);
QueryIterPeek frontIter = QueryIterPeek.create(iter, execCxt);
// Register the iterator for the backend request with all corresponding outputIds
for (Integer outputId : backendRequests.getItems().keySet()) {
SliceKey sliceKey = outputToSliceKey.get(outputId);
sliceKeyToIter.put(sliceKey, frontIter);
}
int lastOutputId = backendRequests.getItems().lastKey();
SliceKey lastSliceKey = outputToSliceKey.get(lastOutputId);
sliceKeyToClose.add(lastSliceKey); // frontIter);
backendIt = backendItPrimary;
if (bypassCacheOnFirstInput) {
// If we come here then a number fo bindings was handed to the client
// but then we weren't sure whether we can deliver any more w.r.t. the
// backend result set size limit - consume as many bindings already handed to the client
for (int i = 0; i < currentInputIdBindingsServed; ++i) {
if (backendIt.hasNext()) {
backendIt.next();
}
}
}
}
}