in plugins/transforms/multimerge/src/main/java/org/apache/hop/pipeline/transforms/multimerge/MultiMergeJoin.java [199:390]
public boolean processRow() throws HopException {
if (first) {
if (!processFirstRow(meta, data)) {
setOutputDone();
return false;
}
first = false;
}
if (isRowLevel()) {
String metaString =
BaseMessages.getString(
PKG, "MultiMergeJoin.Log.DataInfo", data.metas[0].getString(data.rows[0]) + "");
for (int i = 1; i < data.metas.length; i++) {
metaString += data.metas[i].getString(data.rows[i]);
}
logRowlevel(metaString);
}
/*
* We can stop processing if any of the following is true: a) All streams are empty b) Any stream is empty and join
* type is INNER
*/
int streamSize = data.metas.length;
if (data.optional) {
if (data.queue.isEmpty()) {
setOutputDone();
return false;
}
MultiMergeJoinData.QueueEntry minEntry = data.queue.poll();
int drainSize = 1;
data.rows[minEntry.index] = minEntry.row;
data.drainIndices[0] = minEntry.index;
MultiMergeJoinData.QueueComparator comparator =
(MultiMergeJoinData.QueueComparator) data.queue.comparator();
while (!data.queue.isEmpty() && comparator.compare(data.queue.peek(), minEntry) == 0) {
MultiMergeJoinData.QueueEntry entry = data.queue.poll();
data.rows[entry.index] = entry.row;
data.drainIndices[drainSize++] = entry.index;
}
int index;
Object[] row = null;
// rows from nonempty input streams match: get all equal rows and create result set
for (int i = 0; i < drainSize; i++) {
index = data.drainIndices[i];
data.results.get(index).add(data.rows[index]);
while (!isStopped()
&& ((row = getRowFrom(data.rowSets[index])) != null
&& data.metas[index].compare(data.rows[index], row, data.keyNrs[index]) == 0)) {
data.results.get(index).add(row);
}
if (isStopped()) {
return false;
}
if (row != null) {
data.queueEntries[index].row = row;
data.queue.add(data.queueEntries[index]);
}
}
for (int i = 0; i < streamSize; i++) {
data.drainIndices[i] = 0;
if (data.results.get(i).isEmpty()) {
data.results.get(i).add(data.dummy[i]);
}
}
int current = 0;
while (true) {
for (int i = 0; i < streamSize; i++) {
data.rows[i] = data.results.get(i).get(data.drainIndices[i]);
}
row = RowDataUtil.createResizedCopy(data.rows, data.rowLengths);
putRow(data.outputRowMeta, row);
while (++data.drainIndices[current] >= data.results.get(current).size()) {
data.drainIndices[current] = 0;
if (++current >= streamSize) {
break;
}
}
if (current >= streamSize) {
break;
}
current = 0;
}
for (int i = 0; i < streamSize; i++) {
data.results.get(i).clear();
}
} else {
if (data.queue.size() < streamSize) {
data.queue.clear();
for (int i = 0; i < streamSize; i++) {
while (data.rows[i] != null && !isStopped()) {
try {
data.rows[i] = getRowFrom(data.rowSets[i]);
} catch (Exception e) {
// break loop
break;
}
}
}
setOutputDone();
return false;
}
MultiMergeJoinData.QueueEntry minEntry = data.queue.poll();
int drainSize = 1;
data.rows[minEntry.index] = minEntry.row;
data.drainIndices[0] = minEntry.index;
MultiMergeJoinData.QueueComparator comparator =
(MultiMergeJoinData.QueueComparator) data.queue.comparator();
while (!data.queue.isEmpty() && comparator.compare(data.queue.peek(), minEntry) == 0) {
MultiMergeJoinData.QueueEntry entry = data.queue.poll();
data.rows[entry.index] = entry.row;
data.drainIndices[drainSize++] = entry.index;
}
Object[] row = null;
if (data.queue.isEmpty()) {
// rows from all input streams match: get all equal rows and create result set
for (int i = 0; i < streamSize; i++) {
data.results.get(i).add(data.rows[i]);
while (!isStopped()
&& ((row = getRowFrom(data.rowSets[i])) != null
&& data.metas[i].compare(data.rows[i], row, data.keyNrs[i]) == 0)) {
data.results.get(i).add(row);
}
if (isStopped()) {
setOutputDone();
return false;
}
if (row != null) {
data.queueEntries[i].row = row;
data.queue.add(data.queueEntries[i]);
}
}
for (int i = 0; i < streamSize; i++) {
data.drainIndices[i] = 0;
}
int current = 0;
while (true) {
for (int i = 0; i < streamSize; i++) {
data.rows[i] = data.results.get(i).get(data.drainIndices[i]);
}
row = RowDataUtil.createResizedCopy(data.rows, data.rowLengths);
putRow(data.outputRowMeta, row);
while (++data.drainIndices[current] >= data.results.get(current).size()) {
data.drainIndices[current] = 0;
if (++current >= streamSize) {
break;
}
}
if (current >= streamSize) {
break;
}
current = 0;
}
for (int i = 0; i < streamSize; i++) {
data.results.get(i).clear();
}
} else {
// mismatch found and no results can be generated
for (int i = 0; i < drainSize; i++) {
int index = data.drainIndices[i];
while ((row = getRowFrom(data.rowSets[index])) != null
&& data.metas[index].compare(data.rows[index], row, data.keyNrs[index]) == 0) {
if (isStopped()) {
break;
}
}
if (isStopped() || row == null) {
break;
}
data.queueEntries[index].row = row;
data.queue.add(data.queueEntries[index]);
}
if (isStopped()) {
setOutputDone();
return false;
}
}
}
if (checkFeedback(getLinesRead())) {
logBasic(BaseMessages.getString(PKG, "MultiMergeJoin.LineNumber") + getLinesRead());
}
return true;
}