in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java [170:346]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
: predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender();
private final FrameTupleAppender ftappender = new FrameTupleAppender();
private IFrame[] bufferForPartitions;
private final IFrame inBuffer = new VSizeFrame(ctx);
@Override
public void close() throws HyracksDataException {
if (state.memoryForHashtable != 0)
build(inBuffer.getBuffer());
for (int i = 0; i < state.nPartitions; i++) {
ByteBuffer buf = bufferForPartitions[i].getBuffer();
accessorBuild.reset(buf);
if (accessorBuild.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
}
ctx.setStateObject(state);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (state.memoryForHashtable != memsize - 2) {
accessorBuild.reset(buffer);
int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry;
if (state.memoryForHashtable == 0) {
entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
boolean newBuffer = false;
IFrame bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi.getBuffer());
bufBi.reset();
newBuffer = true;
}
}
} else {
entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
if (entry < state.memoryForHashtable) {
while (true) {
if (!ftappender.append(accessorBuild, i)) {
build(inBuffer.getBuffer());
ftappender.reset(inBuffer, true);
} else {
break;
}
}
} else {
entry %= state.nPartitions;
boolean newBuffer = false;
IFrame bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi.getBuffer());
bufBi.reset();
newBuffer = true;
}
}
}
}
}
} else {
build(buffer);
}
}
private void build(ByteBuffer inBuffer) throws HyracksDataException {
ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.capacity());
FrameUtils.copyAndFlip(inBuffer, copyBuffer);
state.joiner.build(copyBuffer);
}
@Override
public void open() throws HyracksDataException {
if (memsize > 1) {
if (memsize > inputsize0) {
state.nPartitions = 0;
} else {
state.nPartitions = (int) (Math
.ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
}
if (state.nPartitions <= 0) {
// becomes in-memory HJ
state.memoryForHashtable = memsize - 2;
state.nPartitions = 0;
} else {
state.memoryForHashtable = memsize - state.nPartitions - 2;
if (state.memoryForHashtable < 0) {
state.memoryForHashtable = 0;
state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
}
}
} else {
throw new HyracksDataException("not enough memory");
}
ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
.createPartitioner();
ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
ISerializableTable table = new SerializableHashTable(tableSize, ctx);
state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
isLeftOuter, nullWriters1, table, predEvaluator);
bufferForPartitions = new IFrame[state.nPartitions];
state.fWriters = new RunFileWriter[state.nPartitions];
for (int i = 0; i < state.nPartitions; i++) {
bufferForPartitions[i] = new VSizeFrame(ctx);
}
ftappender.reset(inBuffer, true);
}
@Override
public void fail() throws HyracksDataException {
}
private void closeWriter(int i) throws HyracksDataException {
RunFileWriter writer = state.fWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = state.fWriters[i];
if (writer == null) {
FileReference file = ctx.getJobletContext()
.createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
state.fWriters[i] = writer;
}
writer.nextFrame(head);
}
};
return op;
}