in plugins/transforms/mergejoin/src/main/java/org/apache/hop/pipeline/transforms/mergejoin/MergeJoin.java [63:424]
public boolean processRow() throws HopException {
int compare;
if (first) {
first = false;
// Find the RowSet to read from
//
List<IStream> infoStreams = meta.getTransformIOMeta().getInfoStreams();
data.oneRowSet = findInputRowSet(infoStreams.get(0).getTransformName());
if (data.oneRowSet == null) {
throw new HopException(
BaseMessages.getString(
PKG,
"MergeJoin.Exception.UnableToFindSpecifiedTransform",
infoStreams.get(0).getTransformName()));
}
data.twoRowSet = findInputRowSet(infoStreams.get(1).getTransformName());
if (data.twoRowSet == null) {
throw new HopException(
BaseMessages.getString(
PKG,
"MergeJoin.Exception.UnableToFindSpecifiedTransform",
infoStreams.get(1).getTransformName()));
}
data.one = getRowFrom(data.oneRowSet);
if (data.one != null) {
data.oneMeta = data.oneRowSet.getRowMeta();
} else {
data.one = null;
data.oneMeta =
getPipelineMeta().getTransformFields(this, infoStreams.get(0).getTransformName());
}
data.two = getRowFrom(data.twoRowSet);
if (data.two != null) {
data.twoMeta = data.twoRowSet.getRowMeta();
} else {
data.two = null;
data.twoMeta =
getPipelineMeta().getTransformFields(this, infoStreams.get(1).getTransformName());
}
// just for speed: oneMeta+twoMeta
//
data.outputRowMeta = new RowMeta();
data.outputRowMeta.mergeRowMeta(data.oneMeta.clone());
data.outputRowMeta.mergeRowMeta(data.twoMeta.clone());
if (data.one != null) {
// Find the key indexes:
data.keyNrs1 = new int[meta.getKeyFields1().size()];
for (int i = 0; i < data.keyNrs1.length; i++) {
data.keyNrs1[i] = data.oneMeta.indexOfValue(meta.getKeyFields1().get(i));
if (data.keyNrs1[i] < 0) {
String message =
BaseMessages.getString(
PKG,
"MergeJoin.Exception.UnableToFindFieldInReferenceStream",
meta.getKeyFields1().get(i));
logError(message);
throw new HopTransformException(message);
}
}
}
if (data.two != null) {
// Find the key indexes:
data.keyNrs2 = new int[meta.getKeyFields2().size()];
for (int i = 0; i < data.keyNrs2.length; i++) {
data.keyNrs2[i] = data.twoMeta.indexOfValue(meta.getKeyFields2().get(i));
if (data.keyNrs2[i] < 0) {
String message =
BaseMessages.getString(
PKG,
"MergeJoin.Exception.UnableToFindFieldInReferenceStream",
meta.getKeyFields2().get(i));
logError(message);
throw new HopTransformException(message);
}
}
}
// Calculate one_dummy... defaults to null
data.one_dummy = RowDataUtil.allocateRowData(data.oneMeta.size() + data.twoMeta.size());
// Calculate two_dummy... defaults to null
//
data.two_dummy = new Object[data.twoMeta.size()];
}
if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(
PKG, "MergeJoin.Log.DataInfo", data.oneMeta.getString(data.one) + "")
+ data.twoMeta.getString(data.two));
}
/*
* We can stop processing if any of the following is true: a) Both streams are empty b) First stream is empty and
* join type is INNER or LEFT OUTER c) Second stream is empty and join type is INNER or RIGHT OUTER
*/
if ((data.one == null && data.two == null)
|| (data.one == null && data.one_optional == false)
|| (data.two == null && data.two_optional == false)) {
// Before we stop processing, we have to make sure that all rows from both input streams are
// depleted!
// If we don't do this, the pipeline can stall.
//
while (data.one != null && !isStopped()) {
data.one = getRowFrom(data.oneRowSet);
}
while (data.two != null && !isStopped()) {
data.two = getRowFrom(data.twoRowSet);
}
setOutputDone();
return false;
}
if (data.one == null) {
compare = -1;
} else {
if (data.two == null) {
compare = 1;
} else {
int cmp =
data.oneMeta.compare(data.one, data.twoMeta, data.two, data.keyNrs1, data.keyNrs2);
compare = cmp > 0 ? 1 : cmp < 0 ? -1 : 0;
}
}
switch (compare) {
case 0:
/*
* We've got a match. This is what we do next (to handle duplicate keys correctly): Read the next record from
* both streams If any of the keys match, this means we have duplicates. We therefore Create an array of all
* rows that have the same keys Push a Cartesian product of the two arrays to output Else Just push the combined
* rowset to output
*/
data.one_next = getRowFrom(data.oneRowSet);
data.two_next = getRowFrom(data.twoRowSet);
int compare1 =
(data.one_next == null)
? -1
: data.oneMeta.compare(data.one, data.one_next, data.keyNrs1, data.keyNrs1);
int compare2 =
(data.two_next == null)
? -1
: data.twoMeta.compare(data.two, data.two_next, data.keyNrs2, data.keyNrs2);
if (compare1 == 0 || compare2 == 0) { // Duplicate keys
if (data.ones == null) {
data.ones = new ArrayList<>();
} else {
data.ones.clear();
}
if (data.twos == null) {
data.twos = new ArrayList<>();
} else {
data.twos.clear();
}
data.ones.add(data.one);
if (compare1 == 0) {
// First stream has duplicates
data.ones.add(data.one_next);
for (; !isStopped(); ) {
data.one_next = getRowFrom(data.oneRowSet);
if (0
!= ((data.one_next == null)
? -1
: data.oneMeta.compare(
data.one, data.one_next, data.keyNrs1, data.keyNrs1))) {
break;
}
data.ones.add(data.one_next);
}
if (isStopped()) {
return false;
}
}
data.twos.add(data.two);
if (compare2 == 0) { // Second stream has duplicates
data.twos.add(data.two_next);
for (; !isStopped(); ) {
data.two_next = getRowFrom(data.twoRowSet);
if (0
!= ((data.two_next == null)
? -1
: data.twoMeta.compare(
data.two, data.two_next, data.keyNrs2, data.keyNrs2))) {
break;
}
data.twos.add(data.two_next);
}
if (isStopped()) {
return false;
}
}
for (Iterator<Object[]> oneIter = data.ones.iterator();
oneIter.hasNext() && !isStopped(); ) {
Object[] one = oneIter.next();
for (Iterator<Object[]> twoIter = data.twos.iterator();
twoIter.hasNext() && !isStopped(); ) {
Object[] two = twoIter.next();
Object[] oneBig =
RowDataUtil.createResizedCopy(one, data.oneMeta.size() + data.twoMeta.size());
Object[] combi = RowDataUtil.addRowData(oneBig, data.oneMeta.size(), two);
putRow(data.outputRowMeta, combi);
}
// Remove the rows as we merge them to keep the overall memory footprint minimal
oneIter.remove();
}
data.twos.clear();
} else {
// No duplicates
Object[] outputRowData = RowDataUtil.addRowData(data.one, data.oneMeta.size(), data.two);
putRow(data.outputRowMeta, outputRowData);
}
data.one = data.one_next;
data.two = data.two_next;
break;
case 1:
/*
* First stream is greater than the second stream. This means: a) This key is missing in the first stream b)
* Second stream may have finished So, if full/right outer join is set and 2nd stream is not null, we push a
* record to output with only the values for the second row populated. Next, if 2nd stream is not finished, we
* get a row from it; otherwise signal that we are done
*/
if (data.one_optional == true) {
if (data.two != null) {
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
putRow(data.outputRowMeta, outputRowData);
data.two = getRowFrom(data.twoRowSet);
} else if (data.two_optional == false) {
/*
* If we are doing right outer join then we are done since there are no more rows in the second set
*/
// Before we stop processing, we have to make sure that all rows from both input streams
// are depleted!
// If we don't do this, the pipeline can stall.
//
while (data.one != null && !isStopped()) {
data.one = getRowFrom(data.oneRowSet);
}
while (data.two != null && !isStopped()) {
data.two = getRowFrom(data.twoRowSet);
}
setOutputDone();
return false;
} else {
/*
* We are doing full outer join so print the 1st stream and get the next row from 1st stream
*/
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
outputRowData =
RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
putRow(data.outputRowMeta, outputRowData);
data.one = getRowFrom(data.oneRowSet);
}
} else if (data.two == null && data.two_optional == true) {
/**
* We have reached the end of stream 2 and there are records present in the first stream.
* Also, join is left or full outer. So, create a row with just the values in the first
* stream and push it forward
*/
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
outputRowData =
RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
putRow(data.outputRowMeta, outputRowData);
data.one = getRowFrom(data.oneRowSet);
} else if (data.two != null) {
/*
* We are doing an inner or left outer join, so throw this row away from the 2nd stream
*/
data.two = getRowFrom(data.twoRowSet);
}
break;
case -1:
/*
* Second stream is greater than the first stream. This means: a) This key is missing in the second stream b)
* First stream may have finished So, if full/left outer join is set and 1st stream is not null, we push a
* record to output with only the values for the first row populated. Next, if 1st stream is not finished, we
* get a row from it; otherwise signal that we are done
*/
if (data.two_optional == true) {
if (data.one != null) {
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
outputRowData =
RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
putRow(data.outputRowMeta, outputRowData);
data.one = getRowFrom(data.oneRowSet);
} else if (data.one_optional == false) {
/*
* We are doing a left outer join and there are no more rows in the first stream; so we are done
*/
// Before we stop processing, we have to make sure that all rows from both input streams
// are depleted!
// If we don't do this, the pipeline can stall.
//
while (data.one != null && !isStopped()) {
data.one = getRowFrom(data.oneRowSet);
}
while (data.two != null && !isStopped()) {
data.two = getRowFrom(data.twoRowSet);
}
setOutputDone();
return false;
} else {
/*
* We are doing a full outer join so print the 2nd stream and get the next row from the 2nd stream
*/
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
putRow(data.outputRowMeta, outputRowData);
data.two = getRowFrom(data.twoRowSet);
}
} else if (data.one == null && data.one_optional == true) {
/*
* We have reached the end of stream 1 and there are records present in the second stream. Also, join is right
* or full outer. So, create a row with just the values in the 2nd stream and push it forward
*/
Object[] outputRowData =
RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
putRow(data.outputRowMeta, outputRowData);
data.two = getRowFrom(data.twoRowSet);
} else if (data.one != null) {
/*
* We are doing an inner or right outer join so a non-matching row in the first stream is of no use to us -
* throw it away and get the next row
*/
data.one = getRowFrom(data.oneRowSet);
}
break;
default:
logDebug("We shouldn't be here!!");
// Make sure we do not go into an infinite loop by continuing to read data
data.one = getRowFrom(data.oneRowSet);
data.two = getRowFrom(data.twoRowSet);
break;
}
if (checkFeedback(getLinesRead())) {
logBasic(BaseMessages.getString(PKG, "MergeJoin.LineNumber") + getLinesRead());
}
return true;
}