public void prepareNextBatchExec()

in jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java [461:759]


    public void prepareNextBatchExec(boolean bypassCacheOnFirstInput) {

        freeResources();

        Batch<Integer, PartitionRequest<Binding>> backendRequests = BatchImpl.forInteger();
        Estimate<Long> serviceDescription = resultSizeCache.getLimit(targetService);
        long resultSetLimit = serviceDescription.getValue();
        boolean isExact = serviceDescription.isExact(); // we interpret the limit as a lower bound if exact is false!

        // TODO If the result set limit is known then restrict the iterators to it

        int nextAllocOutputId = 0;
        int batchSize = inputs.size();

        if (logger.isInfoEnabled()) {
            logger.info("Schedule for current batch:");
        }

        int rangeId = currentRangeId;

        for (int inputId = currentInputId; inputId < batchSize; ++inputId) {

            boolean isFirstInput = inputId == currentInputId;

            long displacement = isFirstInput && !bypassCacheOnFirstInput
                    ? currentInputIdBindingsServed
                    : 0l
                    ;

            Binding inputBinding = inputs.get(inputId);
            // Binding joinBinding = new BindingProject(joinVarMap.keySet(), inputBinding);

            Slice<Binding[]> slice = null;
            Lock lock = null;
            RefFuture<ServiceCacheValue> cacheValueRef = null;

            if (cache != null) {

                ServiceCacheKey cacheKey = cacheKeyFactory.createCacheKey(inputBinding);
                // ServiceCacheKey cacheKey = new ServiceCacheKey(targetService, serviceInfo.getRawQueryOp(), joinBinding, useLoopJoin);
                // System.out.println("Lookup with cache key " + cacheKey);

                // Note: cacheValueRef must be closed as part of the iterators that read from the cache
                cacheValueRef = cache.getCache().claim(cacheKey);
                ServiceCacheValue serviceCacheValue = cacheValueRef.await();

                // Lock an existing cache entry so we can read out the loaded ranges
                slice = serviceCacheValue.getSlice();

                if (CacheMode.CLEAR.equals(cacheMode)) {
                    slice.clear();
                }

                lock = slice.getReadWriteLock().readLock();

                if (logger.isDebugEnabled()) {
                    logger.debug("Created cache key: " + cacheKey);
                }
                // Log.debug(BatchRequestIterator.class, "Cached ranges: " + slice.getLoadedRanges().toString());

                lock.lock();
            }

            RangeSet<Long> loadedRanges;
            long knownSize;
            try {
                if (slice != null) {
                    loadedRanges = slice.getLoadedRanges();
                    knownSize = slice.getKnownSize();
                } else {
                    loadedRanges = TreeRangeSet.create();
                    knownSize = -1;
                }

                // Iterate the present/absent ranges
                long start = serviceInfo.getOffset();
                if (start == Query.NOLIMIT) {
                    start = 0;
                }

                long baseLimit = serviceInfo.getLimit();
                if (baseLimit < 0) {
                    baseLimit = Long.MAX_VALUE;
                }

                long limit = baseLimit;
                if (isExact && baseLimit >= 0) {
                    limit = Math.min(limit, resultSetLimit);
                }

                if (displacement != 0) {
                    start += displacement;
                    if (limit != Long.MAX_VALUE) {
                        limit -= displacement;
                    }
                }

                long max = knownSize < 0 ? Long.MAX_VALUE : knownSize;
                long end = limit == Long.MAX_VALUE ? max : LongMath.saturatedAdd(start, limit);
                end = Math.min(end, max);

                Range<Long> requestedRange = end == Long.MAX_VALUE
                    ? Range.atLeast(start)
                    : Range.closedOpen(start, end);

                RangeMap<Long, Boolean> allRanges = TreeRangeMap.create();
                if (bypassCacheOnFirstInput && isFirstInput) {
                    allRanges.put(requestedRange, false);
                    // Note: If we bypass the cache we need to skip the bindings already served
                    //   based on 'currentInputIdBindingsServed'
                } else {
                    RangeSet<Long> presentRanges = loadedRanges.subRangeSet(requestedRange);
                    RangeSet<Long> absentRanges = loadedRanges.complement().subRangeSet(requestedRange);

                    presentRanges.asRanges().forEach(r -> allRanges.put(r, true));
                    absentRanges.asRanges().forEach(r -> allRanges.put(r, false));
                }

                // If the beginning of the request range is covered by a cache then serve from it
                // a global limit may prevent having to fire a backend request
                // However, as soon as we have to fire a backend request we need to ensure we don't serve
                // more data then the seen result set limit
                // If the size of the data can be greater than that limit then:
                //   - We need to start the backend request from the request offset
                //   - The issue is how to handle the next binding

                if (logger.isInfoEnabled()) {
                    logger.info("input " + inputId + ": " +
                        allRanges.toString()
                            .replace("false", "fetch")
                            .replace("true", "cached"));
                }
                Map<Range<Long>, Boolean> mapOfRanges = allRanges.asMapOfRanges();

                if (mapOfRanges.isEmpty()) {
                    // Special case if it is known that there are no bindings:
                    // Register an empty iterator for (inputId, rangeId)

                    // Release the reference to the cache entry immediately
                    if (cacheValueRef != null) {
                        cacheValueRef.close();
                    }

                    SliceKey sliceKey = new SliceKey(inputId, rangeId);
                    QueryIterPeek it = QueryIterPeek.create(new QueryIterNullIterator(execCxt), execCxt);
                    sliceKeyToIter.put(sliceKey, it);
                    sliceKeyToClose.add(sliceKey);

                    inputToRangeToOutput.put(inputId, rangeId, nextAllocOutputId);
                    outputToSliceKey.put(nextAllocOutputId, sliceKey);
                    ++rangeId;
                    ++nextAllocOutputId;
                } else {
                    Iterator<Entry<Range<Long>, Boolean>> rangeIt = mapOfRanges.entrySet().iterator();

                    RefFuture<ServiceCacheValue> finalCacheValueRef = cacheValueRef;

                    boolean usesCacheRead = false;
                    while (rangeIt.hasNext()) {
                        SliceKey sliceKey = new SliceKey(inputId, rangeId);
                        Entry<Range<Long>, Boolean> f = rangeIt.next();

                        Range<Long> range = f.getKey();
                        boolean isLoaded = f.getValue();

                        long lo = range.lowerEndpoint();
                        long hi = range.hasUpperBound() ? range.upperEndpoint() : Long.MAX_VALUE;
                        long lim = hi == Long.MAX_VALUE ? Long.MAX_VALUE : hi - lo;

                        if (isLoaded) {
                            usesCacheRead = true;
                            // Accessor will be closed via channel below
                            SliceAccessor<Binding[]> accessor = slice.newSliceAccessor();

                            // Prevent eviction of the scheduled range
                            accessor.addEvictionGuard(Range.closedOpen(lo, hi));

                            // Create a channel over the accessor for sequential reading
                            // Reading from the channel internally advances the range of data claimed by the accessor
                            // Note: As an improvement the eviction guard could be managed by the channel so that data is immediately released after read.
                            // Channel will be closed via baseIt below
                            ReadableChannel<Binding[]> channel =
                                    new ReadableChannelWithLimit<>(
                                            new ReadableChannelOverSliceAccessor<>(accessor, lo),
                                            lim);

                            // CloseableIterator<Binding> baseIt = ReadableChannels.newIterator(channel);
                            IteratorCloseable<Binding> baseIt = new IteratorOverReadableChannel<>(channel.getArrayOps(), channel, 1024 * 4);

                            // The last iterator's close method also unclaims the cache entry
                            Runnable cacheEntryCloseAction = rangeIt.hasNext() || finalCacheValueRef == null
                                    ? baseIt::close
                                    : () -> {
                                        baseIt.close();
                                        finalCacheValueRef.close();
                                    };

                            // Bridge the cache iterator to jena
                            QueryIterator qIterA = QueryIterPlainWrapper.create(Iter.onClose(baseIt, cacheEntryCloseAction), execCxt);

                            Map<Var, Var> normedToScoped = serviceInfo.getVisibleSubOpVarsNormedToScoped();
                            qIterA = new QueryIteratorMapped(qIterA, normedToScoped);

                            // Add a pseudo idxVar mapping
                            final long idxVarValue = nextAllocOutputId;
                            QueryIterConvert qIterB = new QueryIterConvert(qIterA, b ->
                                BindingFactory.binding(b, idxVar, NodeFactoryExtra.intToNode(idxVarValue)), execCxt);

                            QueryIterPeek it = QueryIterPeek.create(qIterB, execCxt);

                            sliceKeyToIter.put(sliceKey, it);
                            sliceKeyToClose.add(sliceKey);
                        } else {
                            PartitionRequest<Binding> request = new PartitionRequest<>(nextAllocOutputId, inputBinding, lo, lim);
                            backendRequests.put(nextAllocOutputId, request);
                            sliceKeysForBackend.add(sliceKey);
                        }

                        inputToRangeToOutput.put(inputId, rangeId, nextAllocOutputId);
                        outputToSliceKey.put(nextAllocOutputId, sliceKey);
                        ++rangeId;
                        ++nextAllocOutputId;
                    }

                    // Close the reference to the cache entry; QueryIterCaching will manage
                    // claim/unclaim in batches so we don't need to leave the reference open here
                    if (!usesCacheRead && finalCacheValueRef != null) {
                        finalCacheValueRef.close();
                    }
                }
            } finally {
                if (lock != null) {
                    lock.unlock();
                }
            }

            rangeId = 0;
        }

        // Create *deferred* a remote execution if needed
        // A limit on the query may cause the deferred execution to never run
        if (!backendRequests.isEmpty()) {
            BatchQueryRewriteResult rewrite = batchQueryRewriter.rewrite(backendRequests);
            // System.out.println(rewrite);

            Op newSubOp = rewrite.getOp();
            OpService substitutedOp = new OpService(targetService, newSubOp, serviceInfo.getOpService().getSilent());

            // Execute the batch request and wrap it such that ...
            // (1) we can merge it with other backend and cache requests in the right order
            // (2) responses are written to the cache
            Supplier<QueryIterator> qIterSupplier = () -> {
                QueryIterator r = opExecutor.exec(substitutedOp);
                return r;
            };

            QueryIterator qIter = new QueryIterDefer(qIterSupplier);

            // Wrap the iterator such that the items are cached
            if (cache != null) {
                qIter = new QueryIterWrapperCache(qIter, cacheBulkSize, cache, cacheKeyFactory, backendRequests, idxVar, targetService);
            }

            // Apply renaming after cache to avoid mismatch between op and bindings
            qIter = QueryIter.map(qIter, rewrite.getRenames());

            // Wrap the iterator further to detect resultset limit condition
            // Wrap the query iter such that we can peek the next binding in order
            // to decide from which iterator to take the next element
            SubIterator<Binding, QueryIterator> backendItPrimary = IteratorFactoryWithBuffer.wrap(qIter);
            IteratorOnClose<Binding> jenaIt = Iter.onClose(backendItPrimary, qIter::close);
            QueryIterator iter = QueryIterPlainWrapper.create(jenaIt, execCxt);

            QueryIterPeek frontIter = QueryIterPeek.create(iter, execCxt);

            // Register the iterator for the backend request with all corresponding outputIds
            for (Integer outputId : backendRequests.getItems().keySet()) {
                SliceKey sliceKey = outputToSliceKey.get(outputId);
                sliceKeyToIter.put(sliceKey, frontIter);
            }

            int lastOutputId = backendRequests.getItems().lastKey();
            SliceKey lastSliceKey = outputToSliceKey.get(lastOutputId);
            sliceKeyToClose.add(lastSliceKey); // frontIter);

            backendIt = backendItPrimary;

            if (bypassCacheOnFirstInput) {
                // If we come here then a number fo bindings was handed to the client
                // but then we weren't sure whether we can deliver any more w.r.t. the
                // backend result set size limit - consume as many bindings already handed to the client
                for (int i = 0; i < currentInputIdBindingsServed; ++i) {
                    if (backendIt.hasNext()) {
                        backendIt.next();
                    }
                }
            }
        }
    }