public IFrameWriter getInputFrameWriter()

in hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java [232:420]


        public IFrameWriter getInputFrameWriter(final int index) {
            return new IFrameWriter() {
                private final int[] normalizedKey1 =
                        NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);
                private final int[] normalizedKey2 =
                        NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);

                @Override
                public void open() throws HyracksDataException {
                    if (index == 0) {
                        writer.open();
                    }
                }

                @Override
                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                    synchronized (IntersectOperatorNodePushable.this) {
                        if (done) {
                            return;
                        }
                        refAccessor[index].reset(buffer);
                        tupleIndexMarker[index] = 0;
                        consumed.clear(index);
                        if (index != 0) {
                            if (allInputArrived()) {
                                IntersectOperatorNodePushable.this.notifyAll();
                            }
                            while (!consumed.get(index) && !done) {
                                waitOrHyracksException();
                            }
                        } else { //(index == 0)
                            while (!consumed.get(0)) {
                                while (!allInputArrived() && !done) {
                                    waitOrHyracksException();
                                }
                                if (done) {
                                    break;
                                }
                                intersectAllInputs();
                                IntersectOperatorNodePushable.this.notifyAll();
                            }
                        }
                    }
                }

                private void waitOrHyracksException() throws HyracksDataException {
                    try {
                        IntersectOperatorNodePushable.this.wait();
                    } catch (InterruptedException e) {
                        throw HyracksDataException.create(e);
                    }
                }

                private boolean allInputArrived() {
                    return consumed.cardinality() == 0;
                }

                private void intersectAllInputs() throws HyracksDataException {
                    do {
                        int maxInput = findMaxInput();
                        int match = 1;
                        boolean needToUpdateMax = false;
                        for (int i = 0; i < inputArity; i++) {
                            if (i == maxInput) {
                                continue;
                            }
                            while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) {
                                int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput,
                                        refAccessor[maxInput], tupleIndexMarker[maxInput]);
                                if (cmp == 0) {
                                    match++;
                                    break;
                                } else if (cmp < 0) {
                                    tupleIndexMarker[i]++;
                                } else {
                                    needToUpdateMax = true;
                                    break;
                                }
                            }

                            if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
                                consumed.set(i);
                            }
                        }
                        if (match == inputArity) {
                            FrameUtils.appendProjectionToWriter(writer, appender, refAccessor[maxInput],
                                    tupleIndexMarker[maxInput], allProjectFields[maxInput]);
                            for (int i = 0; i < inputArity; i++) {
                                tupleIndexMarker[i]++;
                                if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
                                    consumed.set(i);
                                }
                            }
                        } else if (needToUpdateMax) {
                            tupleIndexMarker[maxInput]++;
                            if (tupleIndexMarker[maxInput] >= refAccessor[maxInput].getTupleCount()) {
                                consumed.set(maxInput);
                            }
                        }

                    } while (consumed.nextSetBit(0) < 0);
                    appender.write(writer, true);
                }

                private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2,
                        FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException {
                    if (firstKeyNormalizerComputer != null) {
                        getFirstNorm(input1, frameTupleAccessor1, tid1, normalizedKey1);
                        getFirstNorm(input2, frameTupleAccessor2, tid2, normalizedKey2);
                        int cmp = NormalizedKeyUtils.compareNormalizeKeys(normalizedKey1, 0, normalizedKey2, 0,
                                normalizedKey1.length);
                        if (cmp != 0 || normalizedKeyDecisive) {
                            return cmp;
                        }
                    }

                    for (int i = 0; i < comparators.length; i++) {
                        int cmp = comparators[i].compare(frameTupleAccessor1.getBuffer().array(),
                                frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[input1][i]),
                                frameTupleAccessor1.getFieldLength(tid1, compareFields[input1][i]),
                                frameTupleAccessor2.getBuffer().array(),
                                frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, compareFields[input2][i]),
                                frameTupleAccessor2.getFieldLength(tid2, compareFields[input2][i]));

                        if (cmp != 0) {
                            return cmp;
                        }
                    }
                    return 0;
                }

                private void getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1, int[] keys) {
                    if (firstKeyNormalizerComputer != null) {
                        firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
                                frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]),
                                frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]), keys, 0);
                    }
                }

                private int findMaxInput() throws HyracksDataException {
                    int max = 0;
                    for (int i = 1; i < inputArity; i++) {
                        int cmp = compare(max, refAccessor[max], tupleIndexMarker[max], i, refAccessor[i],
                                tupleIndexMarker[i]);
                        if (cmp < 0) {
                            max = i;
                        }
                    }
                    return max;
                }

                @Override
                public void fail() throws HyracksDataException {
                    clearStateWith(ACTION.FAILED);
                }

                @Override
                public void close() throws HyracksDataException {
                    clearStateWith(ACTION.CLOSE);
                }

                private void clearStateWith(ACTION action) throws HyracksDataException {
                    synchronized (IntersectOperatorNodePushable.this) {
                        if (index == 0) {
                            doAction(action);
                        }
                        if (done) {
                            return;
                        }
                        consumed.set(index);
                        refAccessor[index] = null;
                        done = true;
                        IntersectOperatorNodePushable.this.notifyAll();
                    }
                }

                private void doAction(ACTION action) throws HyracksDataException {
                    switch (action) {
                        case CLOSE:
                            writer.close();
                            break;
                        case FAILED:
                            writer.fail();
                            break;
                    }
                }

            };
        }