protected void producePartitionTuples()

in hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java [253:404]


    protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
        partitionReader.savePosition(PARTITION_POSITION_SLOT);

        int nChunks = getPartitionChunkCount();
        boolean isFirstChunkInPartition = chunkIdx == 0;

        tAccess.reset(chunkFrame.getBuffer());
        int tBeginIdx = getTupleBeginIdx(chunkIdx);
        int tEndIdx = getTupleEndIdx(chunkIdx);

        for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
            boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;

            tRef.reset(tAccess, tIdx);

            // running aggregates
            produceTuple(tupleBuilder, tAccess, tIdx, tRef);

            // nested aggregates
            nestedAggInit();

            // frame boundaries
            boolean frameValid = true;
            if (frameStartExists) {
                if (frameStartValidationExists) {
                    evaluate(frameStartValidationEvals, tRef, frameStartValidationPointables);
                    frameValid = allTrue(frameStartValidationPointables, booleanAccessor);
                }
                if (frameValid) {
                    evaluate(frameStartEvals, tRef, frameStartPointables);
                }
            }

            if (frameValid && frameEndExists) {
                if (frameEndValidationExists) {
                    evaluate(frameEndValidationEvals, tRef, frameEndValidationPointables);
                    frameValid = allTrue(frameEndValidationPointables, booleanAccessor);
                }
                if (frameValid) {
                    evaluate(frameEndEvals, tRef, frameEndPointables);
                }
            }

            int toSkip = 0;
            if (frameValid && frameOffsetExists) {
                frameOffsetEval.evaluate(tRef, tmpPointable);
                toSkip = integerAccessor.getIntegerValue(tmpPointable.getByteArray(), tmpPointable.getStartOffset(),
                        tmpPointable.getLength());
                frameValid = toSkip >= 0;
            }

            if (frameValid) {
                if (frameExcludeExists) {
                    evaluate(frameExcludeEvals, tRef, frameExcludePointables);
                }
                int toWrite = frameMaxObjects;

                boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
                int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
                int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;

                if (chunkIdxInnerStart < nChunks) {
                    if (frameStartForward && !isFirstTupleInPartition) {
                        partitionReader.restorePosition(FRAME_POSITION_SLOT);
                    } else {
                        partitionReader.rewind();
                    }
                }

                int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;

                frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
                    partitionReader.savePosition(TMP_POSITION_SLOT);
                    IFrame frameInner = partitionReader.nextFrame(false);
                    tAccess2.reset(frameInner.getBuffer());

                    int tBeginIdxInner;
                    if (tBeginIdxInnerStart < 0) {
                        tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
                    } else {
                        tBeginIdxInner = tBeginIdxInnerStart;
                        tBeginIdxInnerStart = -1;
                    }
                    int tEndIdxInner = getTupleEndIdx(chunkIdxInner);

                    for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
                        tRef2.reset(tAccess2, tIdxInner);

                        if (frameStartExists || frameEndExists) {
                            evaluate(frameValueEvals, tRef2, frameValuePointables);
                            if (frameStartExists) {
                                if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
                                    // skip if value < start
                                    continue;
                                }
                                // inside the frame
                                if (chunkIdxFrameStartLocal < 0) {
                                    // save position of the first tuple in this frame
                                    // will continue from it in the next frame iteration
                                    chunkIdxFrameStartLocal = chunkIdxInner;
                                    tBeginIdxFrameStartLocal = tIdxInner;
                                    partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
                                }
                            }
                            if (frameEndExists
                                    && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
                                // value > end => beyond the frame end
                                // exit the frame loop
                                break frame_loop;
                            }
                        }
                        if ((frameExcludeExists && isExcluded()) || (frameExcludeUnaryExists && isExcludedUnary())) {
                            // skip if excluded
                            continue;
                        }

                        if (toSkip > 0) {
                            // skip if offset hasn't been reached
                            toSkip--;
                            continue;
                        }

                        if (toWrite != 0) {
                            nestedAggAggregate(tAccess2, tIdxInner);
                        }
                        if (toWrite > 0) {
                            toWrite--;
                        }
                        if (toWrite == 0) {
                            break frame_loop;
                        }
                    }
                }

                if (frameStartIsMonotonic) {
                    if (chunkIdxFrameStartLocal >= 0) {
                        chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
                        tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
                    } else {
                        // frame start not found, set it beyond the last chunk
                        chunkIdxFrameStartGlobal = nChunks;
                        tBeginIdxFrameStartGlobal = 0;
                    }
                }
            }

            nestedAggOutputFinalResult(tupleBuilder);
            appendToFrameFromTupleBuilder(tupleBuilder);
        }

        partitionReader.restorePosition(PARTITION_POSITION_SLOT);
    }