public IOperatorNodePushable createPushRuntime()

in algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java [69:216]


    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
                    throws HyracksDataException {
        return new AbstractUnaryInputOperatorNodePushable() {
            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
            private final boolean[] isOpen = new boolean[outputArity];
            private final IFrame[] writeBuffers = new IFrame[outputArity];
            private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
            private final IPointable evalPointable = new VoidPointable();
            private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
                    0);
            private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
            private final FrameTupleReference frameTuple = new FrameTupleReference();

            private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
            private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
            private final DataOutput tupleDos = tupleBuilder.getDataOutput();

            @Override
            public void close() throws HyracksDataException {
                HyracksDataException hde = null;
                for (int i = 0; i < outputArity; i++) {
                    if (isOpen[i]) {
                        try {
                            tupleAppender.reset(writeBuffers[i], false);
                            tupleAppender.write(writers[i], false);
                        } catch (Throwable th) {
                            if (hde == null) {
                                hde = new HyracksDataException();
                            }
                            hde.addSuppressed(th);
                        } finally {
                            try {
                                writers[i].close();
                            } catch (Throwable th) {
                                if (hde == null) {
                                    hde = new HyracksDataException();
                                }
                                hde.addSuppressed(th);
                            }
                        }
                    }
                }
                if (hde != null) {
                    throw hde;
                }
            }

            @Override
            public void flush() throws HyracksDataException {
                for (int i = 0; i < outputArity; i++) {
                    tupleAppender.reset(writeBuffers[i], false);
                    tupleAppender.flush(writers[i]);
                }
            }

            @Override
            public void fail() throws HyracksDataException {
                HyracksDataException hde = null;
                for (int i = 0; i < outputArity; i++) {
                    if (isOpen[i]) {
                        try {
                            writers[i].fail();
                        } catch (Throwable th) {
                            if (hde == null) {
                                hde = new HyracksDataException();
                            }
                            hde.addSuppressed(th);
                        }
                    }
                }
                if (hde != null) {
                    throw hde;
                }
            }

            @Override
            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                accessor.reset(buffer);
                int tupleCount = accessor.getTupleCount();
                for (int i = 0; i < tupleCount; i++) {
                    frameTuple.reset(accessor, i);
                    boolean found = false;
                    for (int j = 0; j < evals.length; j++) {
                        try {
                            evals[j].evaluate(frameTuple, evalPointable);
                        } catch (AlgebricksException e) {
                            throw new HyracksDataException(e);
                        }
                        found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
                                evalPointable.getStartOffset(), evalPointable.getLength());
                        if (found) {
                            copyAndAppendTuple(j);
                            break;
                        }
                    }
                    // Optionally write to default output branch.
                    if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
                        copyAndAppendTuple(defaultBranchIndex);
                    }
                }
            }

            private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
                // Copy tuple into tuple builder.
                try {
                    tupleBuilder.reset();
                    for (int i = 0; i < frameTuple.getFieldCount(); i++) {
                        tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
                                frameTuple.getFieldLength(i));
                        tupleBuilder.addFieldEndOffset();
                    }
                } catch (IOException e) {
                    throw new HyracksDataException(e);
                }
                tupleAppender.reset(writeBuffers[outputIndex], false);
                FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
            }

            @Override
            public void open() throws HyracksDataException {
                for (int i = 0; i < writers.length; i++) {
                    isOpen[i] = true;
                    writers[i].open();
                }
                // Create write buffers.
                for (int i = 0; i < outputArity; i++) {
                    writeBuffers[i] = new VSizeFrame(ctx);
                    // Make sure to clear all buffers, since we are reusing the tupleAppender.
                    tupleAppender.reset(writeBuffers[i], true);
                }
                // Create evaluators for partitioning.
                try {
                    for (int i = 0; i < evalFactories.length; i++) {
                        evals[i] = evalFactories[i].createScalarEvaluator(ctx);
                    }
                } catch (AlgebricksException e) {
                    throw new HyracksDataException(e);
                }
            }

            @Override
            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                writers[index] = writer;
            }
        };
    }