public void dispatch()

in engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java [2253:2490]


  public void dispatch() {
    if (pipelineMeta == null) { // for preview reasons, no dispatching is done!
      return;
    }

    TransformMeta transformMeta = pipelineMeta.findTransform(transformName);

    if (log.isDetailed()) {
      logDetailed(BaseMessages.getString(PKG, "BaseTransform.Log.StartingBuffersAllocation"));
    }

    // How many next transforms are there? 0, 1 or more??
    // How many transforms do we send output to?
    List<TransformMeta> previousTransforms =
        pipelineMeta.findPreviousTransforms(transformMeta, true);
    List<TransformMeta> succeedingTransforms = pipelineMeta.findNextTransforms(transformMeta);

    int nrInput = previousTransforms.size();
    int nrOutput = succeedingTransforms.size();

    inputRowSetsLock.writeLock().lock();
    outputRowSetsLock.writeLock().lock();
    try {
      errorRowSet = null;
      prevTransforms = new TransformMeta[nrInput];
      nextTransforms = new TransformMeta[nrOutput];

      currentInputRowSetNr = 0; // we start with input[0]

      if (log.isDetailed()) {
        logDetailed(
            BaseMessages.getString(
                PKG,
                "BaseTransform.Log.TransformMeta",
                String.valueOf(nrInput),
                String.valueOf(nrOutput)));
      }
      // populate input rowsets.
      for (int i = 0; i < previousTransforms.size(); i++) {
        prevTransforms[i] = previousTransforms.get(i);
        if (log.isDetailed()) {
          logDetailed(
              BaseMessages.getString(
                  PKG,
                  "BaseTransform.Log.GotPreviousTransform",
                  transformName,
                  String.valueOf(i),
                  prevTransforms[i].getName()));
        }

        // Looking at the previous transform, you can have either 1 rowset to look at or more then
        // one.
        int prevCopies = prevTransforms[i].getCopies(this);
        int nextCopies = transformMeta.getCopies(this);
        if (log.isDetailed()) {
          logDetailed(
              BaseMessages.getString(
                  PKG,
                  "BaseTransform.Log.InputRowInfo",
                  String.valueOf(prevCopies),
                  String.valueOf(nextCopies)));
        }

        int nrCopies;
        int dispatchType;
        boolean repartitioning;
        if (prevTransforms[i].isPartitioned()) {
          repartitioning =
              !prevTransforms[i]
                  .getTransformPartitioningMeta()
                  .equals(transformMeta.getTransformPartitioningMeta());
        } else {
          repartitioning = transformMeta.isPartitioned();
        }

        if (prevCopies == 1 && nextCopies == 1) {
          // normal hop
          dispatchType = Pipeline.TYPE_DISP_1_1;
          nrCopies = 1;
        } else if (prevCopies == 1 && nextCopies > 1) {
          // one to many hop
          dispatchType = Pipeline.TYPE_DISP_1_N;
          nrCopies = 1;
        } else if (prevCopies > 1 && nextCopies == 1) {
          // from many to one hop
          dispatchType = Pipeline.TYPE_DISP_N_1;
          nrCopies = prevCopies;
        } else if (prevCopies == nextCopies && !repartitioning) {
          // this may be many-to-many or swim-lanes hop
          dispatchType = Pipeline.TYPE_DISP_N_N;
          nrCopies = 1;
        } else { // > 1!
          dispatchType = Pipeline.TYPE_DISP_N_M;
          nrCopies = prevCopies;
        }

        for (int c = 0; c < nrCopies; c++) {
          IRowSet rowSet = null;
          switch (dispatchType) {
            case Pipeline.TYPE_DISP_1_1:
              rowSet = pipeline.findRowSet(prevTransforms[i].getName(), 0, transformName, 0);
              break;
            case Pipeline.TYPE_DISP_1_N:
              rowSet =
                  pipeline.findRowSet(prevTransforms[i].getName(), 0, transformName, getCopy());
              break;
            case Pipeline.TYPE_DISP_N_1:
              rowSet = pipeline.findRowSet(prevTransforms[i].getName(), c, transformName, 0);
              break;
            case Pipeline.TYPE_DISP_N_N:
              rowSet =
                  pipeline.findRowSet(
                      prevTransforms[i].getName(), getCopy(), transformName, getCopy());
              break;
            case Pipeline.TYPE_DISP_N_M:
              rowSet =
                  pipeline.findRowSet(prevTransforms[i].getName(), c, transformName, getCopy());
              break;
            default:
              break;
          }
          if (rowSet != null) {
            inputRowSets.add(rowSet);
            if (log.isDetailed()) {
              logDetailed(
                  BaseMessages.getString(
                      PKG, "BaseTransform.Log.FoundInputRowset", rowSet.getName()));
            }
          } else {
            if (!prevTransforms[i].isMapping() && !transformMeta.isMapping()) {
              logError(BaseMessages.getString(PKG, "BaseTransform.Log.UnableToFindInputRowset"));
              setErrors(1);
              stopAll();
              return;
            }
          }
        }
      }
      // And now the output part!
      for (int i = 0; i < nrOutput; i++) {
        nextTransforms[i] = succeedingTransforms.get(i);

        int prevCopies = transformMeta.getCopies(this);
        int nextCopies = nextTransforms[i].getCopies(this);

        if (log.isDetailed()) {
          logDetailed(
              BaseMessages.getString(
                  PKG,
                  "BaseTransform.Log.OutputRowInfo",
                  String.valueOf(prevCopies),
                  String.valueOf(nextCopies)));
        }

        int nrCopies;
        int dispatchType;
        boolean repartitioning;
        if (transformMeta.isPartitioned()) {
          repartitioning =
              !transformMeta
                  .getTransformPartitioningMeta()
                  .equals(nextTransforms[i].getTransformPartitioningMeta());
        } else {
          repartitioning = nextTransforms[i].isPartitioned();
        }

        if (prevCopies == 1 && nextCopies == 1) {
          dispatchType = Pipeline.TYPE_DISP_1_1;
          nrCopies = 1;
        } else if (prevCopies == 1 && nextCopies > 1) {
          dispatchType = Pipeline.TYPE_DISP_1_N;
          nrCopies = nextCopies;
        } else if (prevCopies > 1 && nextCopies == 1) {
          dispatchType = Pipeline.TYPE_DISP_N_1;
          nrCopies = 1;
        } else if (prevCopies == nextCopies && !repartitioning) {
          dispatchType = Pipeline.TYPE_DISP_N_N;
          nrCopies = 1;
        } else { // > 1!
          dispatchType = Pipeline.TYPE_DISP_N_M;
          nrCopies = nextCopies;
        }

        for (int c = 0; c < nrCopies; c++) {
          IRowSet rowSet = null;
          switch (dispatchType) {
            case Pipeline.TYPE_DISP_1_1:
              rowSet = pipeline.findRowSet(transformName, 0, nextTransforms[i].getName(), 0);
              break;
            case Pipeline.TYPE_DISP_1_N:
              rowSet = pipeline.findRowSet(transformName, 0, nextTransforms[i].getName(), c);
              break;
            case Pipeline.TYPE_DISP_N_1:
              rowSet =
                  pipeline.findRowSet(transformName, getCopy(), nextTransforms[i].getName(), 0);
              break;
            case Pipeline.TYPE_DISP_N_N:
              rowSet =
                  pipeline.findRowSet(
                      transformName, getCopy(), nextTransforms[i].getName(), getCopy());
              break;
            case Pipeline.TYPE_DISP_N_M:
              rowSet =
                  pipeline.findRowSet(transformName, getCopy(), nextTransforms[i].getName(), c);
              break;
            default:
              break;
          }
          if (rowSet != null) {
            outputRowSets.add(rowSet);
            if (log.isDetailed()) {
              logDetailed(
                  BaseMessages.getString(
                      PKG, "BaseTransform.Log.FoundOutputRowset", rowSet.getName()));
            }
          } else {
            if (!transformMeta.isMapping() && !nextTransforms[i].isMapping()) {
              logError(BaseMessages.getString(PKG, "BaseTransform.Log.UnableToFindOutputRowset"));
              setErrors(1);
              stopAll();
              return;
            }
          }
        }
      }
    } finally {
      inputRowSetsLock.writeLock().unlock();
      outputRowSetsLock.writeLock().unlock();
    }

    if (transformMeta.getTargetTransformPartitioningMeta() != null) {
      nextTransformPartitioningMeta = transformMeta.getTargetTransformPartitioningMeta();
    }

    if (log.isDetailed()) {
      logDetailed(BaseMessages.getString(PKG, "BaseTransform.Log.FinishedDispatching"));
    }
  }