public boolean processRow()

in plugins/transforms/mergejoin/src/main/java/org/apache/hop/pipeline/transforms/mergejoin/MergeJoin.java [63:424]


  public boolean processRow() throws HopException {
    int compare;

    if (first) {
      first = false;

      // Find the RowSet to read from
      //
      List<IStream> infoStreams = meta.getTransformIOMeta().getInfoStreams();

      data.oneRowSet = findInputRowSet(infoStreams.get(0).getTransformName());
      if (data.oneRowSet == null) {
        throw new HopException(
            BaseMessages.getString(
                PKG,
                "MergeJoin.Exception.UnableToFindSpecifiedTransform",
                infoStreams.get(0).getTransformName()));
      }

      data.twoRowSet = findInputRowSet(infoStreams.get(1).getTransformName());
      if (data.twoRowSet == null) {
        throw new HopException(
            BaseMessages.getString(
                PKG,
                "MergeJoin.Exception.UnableToFindSpecifiedTransform",
                infoStreams.get(1).getTransformName()));
      }

      data.one = getRowFrom(data.oneRowSet);
      if (data.one != null) {
        data.oneMeta = data.oneRowSet.getRowMeta();
      } else {
        data.one = null;
        data.oneMeta =
            getPipelineMeta().getTransformFields(this, infoStreams.get(0).getTransformName());
      }

      data.two = getRowFrom(data.twoRowSet);
      if (data.two != null) {
        data.twoMeta = data.twoRowSet.getRowMeta();
      } else {
        data.two = null;
        data.twoMeta =
            getPipelineMeta().getTransformFields(this, infoStreams.get(1).getTransformName());
      }

      // just for speed: oneMeta+twoMeta
      //
      data.outputRowMeta = new RowMeta();
      data.outputRowMeta.mergeRowMeta(data.oneMeta.clone());
      data.outputRowMeta.mergeRowMeta(data.twoMeta.clone());

      if (data.one != null) {
        // Find the key indexes:
        data.keyNrs1 = new int[meta.getKeyFields1().size()];
        for (int i = 0; i < data.keyNrs1.length; i++) {
          data.keyNrs1[i] = data.oneMeta.indexOfValue(meta.getKeyFields1().get(i));
          if (data.keyNrs1[i] < 0) {
            String message =
                BaseMessages.getString(
                    PKG,
                    "MergeJoin.Exception.UnableToFindFieldInReferenceStream",
                    meta.getKeyFields1().get(i));
            logError(message);
            throw new HopTransformException(message);
          }
        }
      }

      if (data.two != null) {
        // Find the key indexes:
        data.keyNrs2 = new int[meta.getKeyFields2().size()];
        for (int i = 0; i < data.keyNrs2.length; i++) {
          data.keyNrs2[i] = data.twoMeta.indexOfValue(meta.getKeyFields2().get(i));
          if (data.keyNrs2[i] < 0) {
            String message =
                BaseMessages.getString(
                    PKG,
                    "MergeJoin.Exception.UnableToFindFieldInReferenceStream",
                    meta.getKeyFields2().get(i));
            logError(message);
            throw new HopTransformException(message);
          }
        }
      }

      // Calculate one_dummy... defaults to null
      data.one_dummy = RowDataUtil.allocateRowData(data.oneMeta.size() + data.twoMeta.size());

      // Calculate two_dummy... defaults to null
      //
      data.two_dummy = new Object[data.twoMeta.size()];
    }

    if (isRowLevel()) {
      logRowlevel(
          BaseMessages.getString(
                  PKG, "MergeJoin.Log.DataInfo", data.oneMeta.getString(data.one) + "")
              + data.twoMeta.getString(data.two));
    }

    /*
     * We can stop processing if any of the following is true: a) Both streams are empty b) First stream is empty and
     * join type is INNER or LEFT OUTER c) Second stream is empty and join type is INNER or RIGHT OUTER
     */
    if ((data.one == null && data.two == null)
        || (data.one == null && data.one_optional == false)
        || (data.two == null && data.two_optional == false)) {
      // Before we stop processing, we have to make sure that all rows from both input streams are
      // depleted!
      // If we don't do this, the pipeline can stall.
      //
      while (data.one != null && !isStopped()) {
        data.one = getRowFrom(data.oneRowSet);
      }
      while (data.two != null && !isStopped()) {
        data.two = getRowFrom(data.twoRowSet);
      }

      setOutputDone();
      return false;
    }

    if (data.one == null) {
      compare = -1;
    } else {
      if (data.two == null) {
        compare = 1;
      } else {
        int cmp =
            data.oneMeta.compare(data.one, data.twoMeta, data.two, data.keyNrs1, data.keyNrs2);
        compare = cmp > 0 ? 1 : cmp < 0 ? -1 : 0;
      }
    }

    switch (compare) {
      case 0:
        /*
         * We've got a match. This is what we do next (to handle duplicate keys correctly): Read the next record from
         * both streams If any of the keys match, this means we have duplicates. We therefore Create an array of all
         * rows that have the same keys Push a Cartesian product of the two arrays to output Else Just push the combined
         * rowset to output
         */
        data.one_next = getRowFrom(data.oneRowSet);
        data.two_next = getRowFrom(data.twoRowSet);

        int compare1 =
            (data.one_next == null)
                ? -1
                : data.oneMeta.compare(data.one, data.one_next, data.keyNrs1, data.keyNrs1);
        int compare2 =
            (data.two_next == null)
                ? -1
                : data.twoMeta.compare(data.two, data.two_next, data.keyNrs2, data.keyNrs2);
        if (compare1 == 0 || compare2 == 0) { // Duplicate keys

          if (data.ones == null) {
            data.ones = new ArrayList<>();
          } else {
            data.ones.clear();
          }
          if (data.twos == null) {
            data.twos = new ArrayList<>();
          } else {
            data.twos.clear();
          }
          data.ones.add(data.one);
          if (compare1 == 0) {
            // First stream has duplicates

            data.ones.add(data.one_next);
            for (; !isStopped(); ) {
              data.one_next = getRowFrom(data.oneRowSet);
              if (0
                  != ((data.one_next == null)
                      ? -1
                      : data.oneMeta.compare(
                          data.one, data.one_next, data.keyNrs1, data.keyNrs1))) {
                break;
              }
              data.ones.add(data.one_next);
            }
            if (isStopped()) {
              return false;
            }
          }
          data.twos.add(data.two);
          if (compare2 == 0) { // Second stream has duplicates

            data.twos.add(data.two_next);
            for (; !isStopped(); ) {
              data.two_next = getRowFrom(data.twoRowSet);
              if (0
                  != ((data.two_next == null)
                      ? -1
                      : data.twoMeta.compare(
                          data.two, data.two_next, data.keyNrs2, data.keyNrs2))) {
                break;
              }
              data.twos.add(data.two_next);
            }
            if (isStopped()) {
              return false;
            }
          }
          for (Iterator<Object[]> oneIter = data.ones.iterator();
              oneIter.hasNext() && !isStopped(); ) {
            Object[] one = oneIter.next();
            for (Iterator<Object[]> twoIter = data.twos.iterator();
                twoIter.hasNext() && !isStopped(); ) {
              Object[] two = twoIter.next();
              Object[] oneBig =
                  RowDataUtil.createResizedCopy(one, data.oneMeta.size() + data.twoMeta.size());
              Object[] combi = RowDataUtil.addRowData(oneBig, data.oneMeta.size(), two);
              putRow(data.outputRowMeta, combi);
            }
            // Remove the rows as we merge them to keep the overall memory footprint minimal
            oneIter.remove();
          }
          data.twos.clear();
        } else {
          // No duplicates

          Object[] outputRowData = RowDataUtil.addRowData(data.one, data.oneMeta.size(), data.two);
          putRow(data.outputRowMeta, outputRowData);
        }
        data.one = data.one_next;
        data.two = data.two_next;
        break;
      case 1:
        /*
         * First stream is greater than the second stream. This means: a) This key is missing in the first stream b)
         * Second stream may have finished So, if full/right outer join is set and 2nd stream is not null, we push a
         * record to output with only the values for the second row populated. Next, if 2nd stream is not finished, we
         * get a row from it; otherwise signal that we are done
         */
        if (data.one_optional == true) {
          if (data.two != null) {
            Object[] outputRowData =
                RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
            outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
            putRow(data.outputRowMeta, outputRowData);
            data.two = getRowFrom(data.twoRowSet);
          } else if (data.two_optional == false) {
            /*
             * If we are doing right outer join then we are done since there are no more rows in the second set
             */
            // Before we stop processing, we have to make sure that all rows from both input streams
            // are depleted!
            // If we don't do this, the pipeline can stall.
            //
            while (data.one != null && !isStopped()) {
              data.one = getRowFrom(data.oneRowSet);
            }
            while (data.two != null && !isStopped()) {
              data.two = getRowFrom(data.twoRowSet);
            }

            setOutputDone();
            return false;
          } else {
            /*
             * We are doing full outer join so print the 1st stream and get the next row from 1st stream
             */
            Object[] outputRowData =
                RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
            outputRowData =
                RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
            putRow(data.outputRowMeta, outputRowData);
            data.one = getRowFrom(data.oneRowSet);
          }
        } else if (data.two == null && data.two_optional == true) {
          /**
           * We have reached the end of stream 2 and there are records present in the first stream.
           * Also, join is left or full outer. So, create a row with just the values in the first
           * stream and push it forward
           */
          Object[] outputRowData =
              RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
          outputRowData =
              RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
          putRow(data.outputRowMeta, outputRowData);
          data.one = getRowFrom(data.oneRowSet);
        } else if (data.two != null) {
          /*
           * We are doing an inner or left outer join, so throw this row away from the 2nd stream
           */
          data.two = getRowFrom(data.twoRowSet);
        }
        break;
      case -1:
        /*
         * Second stream is greater than the first stream. This means: a) This key is missing in the second stream b)
         * First stream may have finished So, if full/left outer join is set and 1st stream is not null, we push a
         * record to output with only the values for the first row populated. Next, if 1st stream is not finished, we
         * get a row from it; otherwise signal that we are done
         */
        if (data.two_optional == true) {
          if (data.one != null) {
            Object[] outputRowData =
                RowDataUtil.createResizedCopy(data.one, data.outputRowMeta.size());
            outputRowData =
                RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two_dummy);
            putRow(data.outputRowMeta, outputRowData);
            data.one = getRowFrom(data.oneRowSet);
          } else if (data.one_optional == false) {
            /*
             * We are doing a left outer join and there are no more rows in the first stream; so we are done
             */
            // Before we stop processing, we have to make sure that all rows from both input streams
            // are depleted!
            // If we don't do this, the pipeline can stall.
            //
            while (data.one != null && !isStopped()) {
              data.one = getRowFrom(data.oneRowSet);
            }
            while (data.two != null && !isStopped()) {
              data.two = getRowFrom(data.twoRowSet);
            }

            setOutputDone();
            return false;
          } else {
            /*
             * We are doing a full outer join so print the 2nd stream and get the next row from the 2nd stream
             */
            Object[] outputRowData =
                RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
            outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
            putRow(data.outputRowMeta, outputRowData);
            data.two = getRowFrom(data.twoRowSet);
          }
        } else if (data.one == null && data.one_optional == true) {
          /*
           * We have reached the end of stream 1 and there are records present in the second stream. Also, join is right
           * or full outer. So, create a row with just the values in the 2nd stream and push it forward
           */
          Object[] outputRowData =
              RowDataUtil.createResizedCopy(data.one_dummy, data.outputRowMeta.size());
          outputRowData = RowDataUtil.addRowData(outputRowData, data.oneMeta.size(), data.two);
          putRow(data.outputRowMeta, outputRowData);
          data.two = getRowFrom(data.twoRowSet);
        } else if (data.one != null) {
          /*
           * We are doing an inner or right outer join so a non-matching row in the first stream is of no use to us -
           * throw it away and get the next row
           */
          data.one = getRowFrom(data.oneRowSet);
        }
        break;
      default:
        logDebug("We shouldn't be here!!");
        // Make sure we do not go into an infinite loop by continuing to read data
        data.one = getRowFrom(data.oneRowSet);
        data.two = getRowFrom(data.twoRowSet);
        break;
    }
    if (checkFeedback(getLinesRead())) {
      logBasic(BaseMessages.getString(PKG, "MergeJoin.LineNumber") + getLinesRead());
    }
    return true;
  }