in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java [360:557]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
: predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0);
private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories);
private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender();
private final FrameTupleAppender ftap = new FrameTupleAppender();
private final IFrame inBuffer = new VSizeFrame(ctx);
private final IFrame outBuffer = new VSizeFrame(ctx);
private RunFileWriter[] buildWriters;
private RunFileWriter[] probeWriters;
private IFrame[] bufferForPartitions;
@Override
public void open() throws HyracksDataException {
writer.open();
state = (BuildAndPartitionTaskState) ctx.getStateObject(
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
buildWriters = state.fWriters;
probeWriters = new RunFileWriter[state.nPartitions];
bufferForPartitions = new IFrame[state.nPartitions];
for (int i = 0; i < state.nPartitions; i++) {
bufferForPartitions[i] = new VSizeFrame(ctx);
}
appender.reset(outBuffer, true);
ftap.reset(inBuffer, true);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (state.memoryForHashtable != memsize - 2) {
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
int entry;
if (state.memoryForHashtable == 0) {
entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
boolean newBuffer = false;
IFrame outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf.getBuffer());
outbuf.reset();
newBuffer = true;
}
}
} else {
entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
if (entry < state.memoryForHashtable) {
while (true) {
if (!ftap.append(accessorProbe, i)) {
state.joiner.join(inBuffer.getBuffer(), writer);
ftap.reset(inBuffer, true);
} else
break;
}
} else {
entry %= state.nPartitions;
boolean newBuffer = false;
IFrame outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf.getBuffer());
outbuf.reset();
newBuffer = true;
}
}
}
}
}
} else {
state.joiner.join(buffer, writer);
}
}
@Override
public void close() throws HyracksDataException {
try {
state.joiner.join(inBuffer.getBuffer(), writer);
state.joiner.closeJoin(writer);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
.createPartitioner();
ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
.createPartitioner();
if (state.memoryForHashtable != memsize - 2) {
for (int i = 0; i < state.nPartitions; i++) {
ByteBuffer buf = bufferForPartitions[i].getBuffer();
accessorProbe.reset(buf);
if (accessorProbe.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
}
inBuffer.reset();
int tableSize = -1;
if (state.memoryForHashtable == 0) {
tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
} else {
tableSize = (int) (memsize * recordsPerFrame * factor);
}
ISerializableTable table = new SerializableHashTable(tableSize, ctx);
for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
RunFileWriter buildWriter = buildWriters[partitionid];
RunFileWriter probeWriter = probeWriters[partitionid];
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
table.reset();
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
nullWriters1, table, predEvaluator);
if (buildWriter != null) {
RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
buildReader.open();
while (buildReader.nextFrame(inBuffer)) {
ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
joiner.build(copyBuffer);
inBuffer.reset();
}
buildReader.close();
}
// probe
RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
probeReader.open();
while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer.getBuffer(), writer);
inBuffer.reset();
}
probeReader.close();
joiner.closeJoin(writer);
}
}
} finally {
writer.close();
}
}
private void closeWriter(int i) throws HyracksDataException {
RunFileWriter writer = probeWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = probeWriters[i];
if (writer == null) {
FileReference file = ctx
.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
probeWriters[i] = writer;
}
writer.nextFrame(head);
}
@Override
public void fail() throws HyracksDataException {
writer.fail();
}
};
return op;
}