in algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java [69:216]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final boolean[] isOpen = new boolean[outputArity];
private final IFrame[] writeBuffers = new IFrame[outputArity];
private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
private final IPointable evalPointable = new VoidPointable();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();
private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();
@Override
public void close() throws HyracksDataException {
HyracksDataException hde = null;
for (int i = 0; i < outputArity; i++) {
if (isOpen[i]) {
try {
tupleAppender.reset(writeBuffers[i], false);
tupleAppender.write(writers[i], false);
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
} finally {
try {
writers[i].close();
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
}
}
}
}
if (hde != null) {
throw hde;
}
}
@Override
public void flush() throws HyracksDataException {
for (int i = 0; i < outputArity; i++) {
tupleAppender.reset(writeBuffers[i], false);
tupleAppender.flush(writers[i]);
}
}
@Override
public void fail() throws HyracksDataException {
HyracksDataException hde = null;
for (int i = 0; i < outputArity; i++) {
if (isOpen[i]) {
try {
writers[i].fail();
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
}
}
}
if (hde != null) {
throw hde;
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
frameTuple.reset(accessor, i);
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
evals[j].evaluate(frameTuple, evalPointable);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
evalPointable.getStartOffset(), evalPointable.getLength());
if (found) {
copyAndAppendTuple(j);
break;
}
}
// Optionally write to default output branch.
if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
copyAndAppendTuple(defaultBranchIndex);
}
}
}
private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
// Copy tuple into tuple builder.
try {
tupleBuilder.reset();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
tupleBuilder.addFieldEndOffset();
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
tupleAppender.reset(writeBuffers[outputIndex], false);
FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
}
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < writers.length; i++) {
isOpen[i] = true;
writers[i].open();
}
// Create write buffers.
for (int i = 0; i < outputArity; i++) {
writeBuffers[i] = new VSizeFrame(ctx);
// Make sure to clear all buffers, since we are reusing the tupleAppender.
tupleAppender.reset(writeBuffers[i], true);
}
// Create evaluators for partitioning.
try {
for (int i = 0; i < evalFactories.length; i++) {
evals[i] = evalFactories[i].createScalarEvaluator(ctx);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
writers[index] = writer;
}
};
}