in hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java [253:404]
protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException {
partitionReader.savePosition(PARTITION_POSITION_SLOT);
int nChunks = getPartitionChunkCount();
boolean isFirstChunkInPartition = chunkIdx == 0;
tAccess.reset(chunkFrame.getBuffer());
int tBeginIdx = getTupleBeginIdx(chunkIdx);
int tEndIdx = getTupleEndIdx(chunkIdx);
for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx;
tRef.reset(tAccess, tIdx);
// running aggregates
produceTuple(tupleBuilder, tAccess, tIdx, tRef);
// nested aggregates
nestedAggInit();
// frame boundaries
boolean frameValid = true;
if (frameStartExists) {
if (frameStartValidationExists) {
evaluate(frameStartValidationEvals, tRef, frameStartValidationPointables);
frameValid = allTrue(frameStartValidationPointables, booleanAccessor);
}
if (frameValid) {
evaluate(frameStartEvals, tRef, frameStartPointables);
}
}
if (frameValid && frameEndExists) {
if (frameEndValidationExists) {
evaluate(frameEndValidationEvals, tRef, frameEndValidationPointables);
frameValid = allTrue(frameEndValidationPointables, booleanAccessor);
}
if (frameValid) {
evaluate(frameEndEvals, tRef, frameEndPointables);
}
}
int toSkip = 0;
if (frameValid && frameOffsetExists) {
frameOffsetEval.evaluate(tRef, tmpPointable);
toSkip = integerAccessor.getIntegerValue(tmpPointable.getByteArray(), tmpPointable.getStartOffset(),
tmpPointable.getLength());
frameValid = toSkip >= 0;
}
if (frameValid) {
if (frameExcludeExists) {
evaluate(frameExcludeEvals, tRef, frameExcludePointables);
}
int toWrite = frameMaxObjects;
boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0;
int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0;
int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1;
if (chunkIdxInnerStart < nChunks) {
if (frameStartForward && !isFirstTupleInPartition) {
partitionReader.restorePosition(FRAME_POSITION_SLOT);
} else {
partitionReader.rewind();
}
}
int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) {
partitionReader.savePosition(TMP_POSITION_SLOT);
IFrame frameInner = partitionReader.nextFrame(false);
tAccess2.reset(frameInner.getBuffer());
int tBeginIdxInner;
if (tBeginIdxInnerStart < 0) {
tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
} else {
tBeginIdxInner = tBeginIdxInnerStart;
tBeginIdxInnerStart = -1;
}
int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) {
tRef2.reset(tAccess2, tIdxInner);
if (frameStartExists || frameEndExists) {
evaluate(frameValueEvals, tRef2, frameValuePointables);
if (frameStartExists) {
if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) {
// skip if value < start
continue;
}
// inside the frame
if (chunkIdxFrameStartLocal < 0) {
// save position of the first tuple in this frame
// will continue from it in the next frame iteration
chunkIdxFrameStartLocal = chunkIdxInner;
tBeginIdxFrameStartLocal = tIdxInner;
partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT);
}
}
if (frameEndExists
&& frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
// value > end => beyond the frame end
// exit the frame loop
break frame_loop;
}
}
if ((frameExcludeExists && isExcluded()) || (frameExcludeUnaryExists && isExcludedUnary())) {
// skip if excluded
continue;
}
if (toSkip > 0) {
// skip if offset hasn't been reached
toSkip--;
continue;
}
if (toWrite != 0) {
nestedAggAggregate(tAccess2, tIdxInner);
}
if (toWrite > 0) {
toWrite--;
}
if (toWrite == 0) {
break frame_loop;
}
}
}
if (frameStartIsMonotonic) {
if (chunkIdxFrameStartLocal >= 0) {
chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
} else {
// frame start not found, set it beyond the last chunk
chunkIdxFrameStartGlobal = nChunks;
tBeginIdxFrameStartGlobal = 0;
}
}
}
nestedAggOutputFinalResult(tupleBuilder);
appendToFrameFromTupleBuilder(tupleBuilder);
}
partitionReader.restorePosition(PARTITION_POSITION_SLOT);
}