in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java [179:361]
public IFrameWriter getInputFrameWriter(final int index) {
return new IFrameWriter() {
@Override
public void open() throws HyracksDataException {
if (index == 0) {
writer.open();
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
synchronized (IntersectOperatorNodePushable.this) {
if (done) {
return;
}
refAccessor[index].reset(buffer);
tupleIndexMarker[index] = 0;
consumed.clear(index);
if (index != 0) {
if (allInputArrived()) {
IntersectOperatorNodePushable.this.notifyAll();
}
while (!consumed.get(index) && !done) {
waitOrHyracksException();
}
} else { //(index == 0)
while (!consumed.get(0)) {
while (!allInputArrived() && !done) {
waitOrHyracksException();
}
if (done) {
break;
}
intersectAllInputs();
IntersectOperatorNodePushable.this.notifyAll();
}
}
}
}
private void waitOrHyracksException() throws HyracksDataException {
try {
IntersectOperatorNodePushable.this.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
private boolean allInputArrived() {
return consumed.cardinality() == 0;
}
private void intersectAllInputs() throws HyracksDataException {
do {
int maxInput = findMaxInput();
int match = 1;
boolean needToUpdateMax = false;
for (int i = 0; i < inputArity; i++) {
if (i == maxInput) {
continue;
}
while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) {
int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput,
refAccessor[maxInput], tupleIndexMarker[maxInput]);
if (cmp == 0) {
match++;
break;
} else if (cmp < 0) {
tupleIndexMarker[i]++;
} else {
needToUpdateMax = true;
break;
}
}
if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
consumed.set(i);
}
}
if (match == inputArity) {
FrameUtils.appendProjectionToWriter(writer, appender, refAccessor[maxInput],
tupleIndexMarker[maxInput], projectFields[maxInput]);
for (int i = 0; i < inputArity; i++) {
tupleIndexMarker[i]++;
if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
consumed.set(i);
}
}
} else if (needToUpdateMax) {
tupleIndexMarker[maxInput]++;
if (tupleIndexMarker[maxInput] >= refAccessor[maxInput].getTupleCount()) {
consumed.set(maxInput);
}
}
} while (consumed.nextSetBit(0) < 0);
appender.write(writer, true);
}
private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2,
FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException {
int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1);
int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2);
if (firstNorm1 < firstNorm2) {
return -1;
} else if (firstNorm1 > firstNorm2) {
return 1;
}
for (int i = 0; i < comparators.length; i++) {
int cmp = comparators[i].compare(frameTupleAccessor1.getBuffer().array(),
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[input1][i]),
frameTupleAccessor1.getFieldLength(tid1, projectFields[input1][i]),
frameTupleAccessor2.getBuffer().array(),
frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, projectFields[input2][i]),
frameTupleAccessor2.getFieldLength(tid2, projectFields[input2][i]));
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) {
return firstKeyNormalizerComputer == null ?
0 :
firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[inputId1][0]),
frameTupleAccessor1.getFieldLength(tid1, projectFields[inputId1][0]));
}
private int findMaxInput() throws HyracksDataException {
int max = 0;
for (int i = 1; i < inputArity; i++) {
int cmp = compare(max, refAccessor[max], tupleIndexMarker[max], i, refAccessor[i],
tupleIndexMarker[i]);
if (cmp < 0) {
max = i;
}
}
return max;
}
@Override
public void fail() throws HyracksDataException {
clearStateWith(ACTION.FAILED);
}
@Override
public void close() throws HyracksDataException {
clearStateWith(ACTION.CLOSE);
}
private void clearStateWith(ACTION action) throws HyracksDataException {
synchronized (IntersectOperatorNodePushable.this) {
if (index == 0) {
doAction(action);
}
if (done) {
return;
}
consumed.set(index);
refAccessor[index] = null;
done = true;
IntersectOperatorNodePushable.this.notifyAll();
}
}
private void doAction(ACTION action) throws HyracksDataException {
switch (action) {
case CLOSE:
writer.close();
break;
case FAILED:
writer.fail();
break;
}
}
};
}