public boolean processRow()

in plugins/transforms/multimerge/src/main/java/org/apache/hop/pipeline/transforms/multimerge/MultiMergeJoin.java [199:390]


  public boolean processRow() throws HopException {

    if (first) {
      if (!processFirstRow(meta, data)) {
        setOutputDone();
        return false;
      }
      first = false;
    }

    if (isRowLevel()) {
      String metaString =
          BaseMessages.getString(
              PKG, "MultiMergeJoin.Log.DataInfo", data.metas[0].getString(data.rows[0]) + "");
      for (int i = 1; i < data.metas.length; i++) {
        metaString += data.metas[i].getString(data.rows[i]);
      }
      logRowlevel(metaString);
    }

    /*
     * We can stop processing if any of the following is true: a) All streams are empty b) Any stream is empty and join
     * type is INNER
     */
    int streamSize = data.metas.length;
    if (data.optional) {
      if (data.queue.isEmpty()) {
        setOutputDone();
        return false;
      }
      MultiMergeJoinData.QueueEntry minEntry = data.queue.poll();
      int drainSize = 1;
      data.rows[minEntry.index] = minEntry.row;
      data.drainIndices[0] = minEntry.index;
      MultiMergeJoinData.QueueComparator comparator =
          (MultiMergeJoinData.QueueComparator) data.queue.comparator();
      while (!data.queue.isEmpty() && comparator.compare(data.queue.peek(), minEntry) == 0) {
        MultiMergeJoinData.QueueEntry entry = data.queue.poll();
        data.rows[entry.index] = entry.row;
        data.drainIndices[drainSize++] = entry.index;
      }
      int index;
      Object[] row = null;
      // rows from nonempty input streams match: get all equal rows and create result set
      for (int i = 0; i < drainSize; i++) {
        index = data.drainIndices[i];
        data.results.get(index).add(data.rows[index]);
        while (!isStopped()
            && ((row = getRowFrom(data.rowSets[index])) != null
                && data.metas[index].compare(data.rows[index], row, data.keyNrs[index]) == 0)) {
          data.results.get(index).add(row);
        }
        if (isStopped()) {
          return false;
        }
        if (row != null) {
          data.queueEntries[index].row = row;
          data.queue.add(data.queueEntries[index]);
        }
      }
      for (int i = 0; i < streamSize; i++) {
        data.drainIndices[i] = 0;
        if (data.results.get(i).isEmpty()) {
          data.results.get(i).add(data.dummy[i]);
        }
      }

      int current = 0;

      while (true) {
        for (int i = 0; i < streamSize; i++) {
          data.rows[i] = data.results.get(i).get(data.drainIndices[i]);
        }
        row = RowDataUtil.createResizedCopy(data.rows, data.rowLengths);

        putRow(data.outputRowMeta, row);

        while (++data.drainIndices[current] >= data.results.get(current).size()) {
          data.drainIndices[current] = 0;
          if (++current >= streamSize) {
            break;
          }
        }
        if (current >= streamSize) {
          break;
        }
        current = 0;
      }
      for (int i = 0; i < streamSize; i++) {
        data.results.get(i).clear();
      }
    } else {
      if (data.queue.size() < streamSize) {
        data.queue.clear();
        for (int i = 0; i < streamSize; i++) {
          while (data.rows[i] != null && !isStopped()) {
            try {
              data.rows[i] = getRowFrom(data.rowSets[i]);
            } catch (Exception e) {
              // break loop
              break;
            }
          }
        }
        setOutputDone();
        return false;
      }

      MultiMergeJoinData.QueueEntry minEntry = data.queue.poll();
      int drainSize = 1;
      data.rows[minEntry.index] = minEntry.row;
      data.drainIndices[0] = minEntry.index;
      MultiMergeJoinData.QueueComparator comparator =
          (MultiMergeJoinData.QueueComparator) data.queue.comparator();
      while (!data.queue.isEmpty() && comparator.compare(data.queue.peek(), minEntry) == 0) {
        MultiMergeJoinData.QueueEntry entry = data.queue.poll();
        data.rows[entry.index] = entry.row;
        data.drainIndices[drainSize++] = entry.index;
      }
      Object[] row = null;
      if (data.queue.isEmpty()) {
        // rows from all input streams match: get all equal rows and create result set
        for (int i = 0; i < streamSize; i++) {
          data.results.get(i).add(data.rows[i]);
          while (!isStopped()
              && ((row = getRowFrom(data.rowSets[i])) != null
                  && data.metas[i].compare(data.rows[i], row, data.keyNrs[i]) == 0)) {
            data.results.get(i).add(row);
          }
          if (isStopped()) {
            setOutputDone();
            return false;
          }
          if (row != null) {
            data.queueEntries[i].row = row;
            data.queue.add(data.queueEntries[i]);
          }
        }
        for (int i = 0; i < streamSize; i++) {
          data.drainIndices[i] = 0;
        }

        int current = 0;
        while (true) {
          for (int i = 0; i < streamSize; i++) {
            data.rows[i] = data.results.get(i).get(data.drainIndices[i]);
          }
          row = RowDataUtil.createResizedCopy(data.rows, data.rowLengths);

          putRow(data.outputRowMeta, row);
          while (++data.drainIndices[current] >= data.results.get(current).size()) {
            data.drainIndices[current] = 0;
            if (++current >= streamSize) {
              break;
            }
          }
          if (current >= streamSize) {
            break;
          }
          current = 0;
        }
        for (int i = 0; i < streamSize; i++) {
          data.results.get(i).clear();
        }
      } else {
        // mismatch found and no results can be generated

        for (int i = 0; i < drainSize; i++) {
          int index = data.drainIndices[i];
          while ((row = getRowFrom(data.rowSets[index])) != null
              && data.metas[index].compare(data.rows[index], row, data.keyNrs[index]) == 0) {
            if (isStopped()) {
              break;
            }
          }
          if (isStopped() || row == null) {
            break;
          }
          data.queueEntries[index].row = row;
          data.queue.add(data.queueEntries[index]);
        }
        if (isStopped()) {
          setOutputDone();
          return false;
        }
      }
    }
    if (checkFeedback(getLinesRead())) {
      logBasic(BaseMessages.getString(PKG, "MultiMergeJoin.LineNumber") + getLinesRead());
    }
    return true;
  }