public void prepareExecution()

in engine/src/main/java/org/apache/hop/pipeline/Pipeline.java [539:1096]


  public void prepareExecution() throws HopException {
    setPreparing(true);
    executionStartDate = new Date();
    setRunning(false);

    // We create the log channel when we're ready to rock and roll
    // Before that it makes little sense. We default to GENERAL there.
    // We force the creation of a new log channel ID every time we run this pipeline
    //
    this.log = new LogChannel(this, parent, isGatheringMetrics(), true);
    this.log.setLogLevel(logLevel);

    if (this.containerObjectId == null) {
      this.containerObjectId = log.getContainerObjectId();
    }

    if (log.isDebug()) {
      log.logDebug(
          BaseMessages.getString(
              PKG,
              "Pipeline.Log.NumberOfTransformsToRun",
              String.valueOf(pipelineMeta.nrTransforms()),
              String.valueOf(pipelineMeta.nrPipelineHops())));
    }

    log.snap(Metrics.METRIC_PIPELINE_EXECUTION_START);
    log.snap(Metrics.METRIC_PIPELINE_INIT_START);

    log.logBasic(
        "Executing this pipeline using the Local Pipeline Engine with run configuration '"
            + pipelineRunConfiguration.getName()
            + "'");

    ExtensionPointHandler.callExtensionPoint(
        log, this, HopExtensionPoint.PipelinePrepareExecution.id, this);

    activateParameters(this);

    if (pipelineMeta.getName() == null) {
      if (pipelineMeta.getFilename() != null) {
        log.logBasic(
            BaseMessages.getString(
                PKG, "Pipeline.Log.ExecutionStartedForFilename", pipelineMeta.getFilename()));
      }
    } else {
      log.logBasic(
          BaseMessages.getString(
              PKG, "Pipeline.Log.ExecutionStartedForPipeline", pipelineMeta.getName()));
    }

    if (isSafeModeEnabled()) {
      if (log.isDetailed()) {
        log.logDetailed(
            BaseMessages.getString(PKG, "Pipeline.Log.SafeModeIsEnabled", pipelineMeta.getName()));
      }
    }

    // setInternalHopVariables(this); --> Let's not do this, when running
    // without file, for example remote, it spoils the fun

    // Keep track of all the row sets and allocated transforms
    //
    transforms = Collections.synchronizedList(new ArrayList<>());
    rowsets = new ArrayList<>();

    List<TransformMeta> hopTransforms = pipelineMeta.getPipelineHopTransforms(false);

    if (log.isDetailed()) {
      log.logDetailed(
          BaseMessages.getString(
              PKG, "Pipeline.Log.FoundDefferentTransforms", String.valueOf(hopTransforms.size())));
      log.logDetailed(BaseMessages.getString(PKG, "Pipeline.Log.AllocatingRowsets"));
    }
    // First allocate all the rowsets required!
    // Note that a mapping doesn't receive ANY input or output rowsets...
    //
    for (int i = 0; i < hopTransforms.size(); i++) {
      TransformMeta thisTransform = hopTransforms.get(i);
      if (thisTransform.isMapping()) {
        continue; // handled and allocated by the mapping transform itself.
      }

      if (log.isDetailed()) {
        log.logDetailed(
            BaseMessages.getString(
                PKG,
                "Pipeline.Log.AllocateingRowsetsForTransform",
                String.valueOf(i),
                thisTransform.getName()));
      }

      List<TransformMeta> nextTransforms = pipelineMeta.findNextTransforms(thisTransform);
      for (TransformMeta nextTransform : nextTransforms) {
        // What's the next transform?
        if (nextTransform.isMapping()) {
          continue; // handled and allocated by the mapping transform itself.
        }

        // How many times do we start the source transform?
        int thisCopies = thisTransform.getCopies(this);

        if (thisCopies < 0) {
          // This can only happen if a variable is used that didn't resolve to a positive integer
          // value
          //
          throw new HopException(
              BaseMessages.getString(
                  PKG, "Pipeline.Log.TransformCopiesNotCorrectlyDefined", thisTransform.getName()));
        }

        // How many times do we start the target transform?
        int nextCopies = nextTransform.getCopies(this);

        // Are we re-partitioning?
        boolean repartitioning;
        if (thisTransform.isPartitioned()) {
          repartitioning =
              !thisTransform
                  .getTransformPartitioningMeta()
                  .equals(nextTransform.getTransformPartitioningMeta());
        } else {
          repartitioning = nextTransform.isPartitioned();
        }

        int nrCopies;
        if (log.isDetailed()) {
          log.logDetailed(
              BaseMessages.getString(
                  PKG,
                  "Pipeline.Log.copiesInfo",
                  String.valueOf(thisCopies),
                  String.valueOf(nextCopies)));
        }
        int dispatchType;
        if (thisCopies == 1 && nextCopies == 1) {
          dispatchType = TYPE_DISP_1_1;
          nrCopies = 1;
        } else if (thisCopies == 1 && nextCopies > 1) {
          dispatchType = TYPE_DISP_1_N;
          nrCopies = nextCopies;
        } else if (thisCopies > 1 && nextCopies == 1) {
          dispatchType = TYPE_DISP_N_1;
          nrCopies = thisCopies;
        } else if (thisCopies == nextCopies && !repartitioning) {
          dispatchType = TYPE_DISP_N_N;
          nrCopies = nextCopies;
        } else {
          // > 1!
          dispatchType = TYPE_DISP_N_M;
          nrCopies = nextCopies;
        } // Allocate a rowset for each destination transform

        // Allocate the rowsets
        //
        if (dispatchType != TYPE_DISP_N_M) {
          for (int c = 0; c < nrCopies; c++) {
            IRowSet rowSet;
            switch (pipelineMeta.getPipelineType()) {
              case Normal:
                // This is a temporary patch until the batching rowset has proven
                // to be working in all situations.
                // Currently there are stalling problems when dealing with small
                // amounts of rows.
                //
                Boolean batchingRowSet =
                    ValueMetaBase.convertStringToBoolean(
                        System.getProperty(Const.HOP_BATCHING_ROWSET));
                if (batchingRowSet != null && batchingRowSet.booleanValue()) {
                  rowSet = new BlockingBatchingRowSet(rowSetSize);
                } else {
                  rowSet = new BlockingRowSet(rowSetSize);
                }
                break;

              case SingleThreaded:
                rowSet = new QueueRowSet();
                break;

              default:
                throw new HopException(
                    "Unhandled pipeline type: " + pipelineMeta.getPipelineType());
            }

            switch (dispatchType) {
              case TYPE_DISP_1_1:
                rowSet.setThreadNameFromToCopy(
                    thisTransform.getName(), 0, nextTransform.getName(), 0);
                break;
              case TYPE_DISP_1_N:
                rowSet.setThreadNameFromToCopy(
                    thisTransform.getName(), 0, nextTransform.getName(), c);
                break;
              case TYPE_DISP_N_1:
                rowSet.setThreadNameFromToCopy(
                    thisTransform.getName(), c, nextTransform.getName(), 0);
                break;
              case TYPE_DISP_N_N:
                rowSet.setThreadNameFromToCopy(
                    thisTransform.getName(), c, nextTransform.getName(), c);
                break;
              default:
                break;
            }
            rowsets.add(rowSet);
            if (log.isDetailed()) {
              log.logDetailed(
                  BaseMessages.getString(
                      PKG, "Pipeline.PipelineAllocatedNewRowset", rowSet.toString()));
            }
          }
        } else {
          // For each N source transforms we have M target transforms
          //
          // From each input transform we go to all output transforms.
          // This allows maximum flexibility for re-partitioning,
          // distribution...
          for (int s = 0; s < thisCopies; s++) {
            for (int t = 0; t < nextCopies; t++) {
              BlockingRowSet rowSet = new BlockingRowSet(rowSetSize);
              rowSet.setThreadNameFromToCopy(
                  thisTransform.getName(), s, nextTransform.getName(), t);
              rowsets.add(rowSet);
              if (log.isDetailed()) {
                log.logDetailed(
                    BaseMessages.getString(
                        PKG, "Pipeline.PipelineAllocatedNewRowset", rowSet.toString()));
              }
            }
          }
        }
      }
      log.logDetailed(
          BaseMessages.getString(
                  PKG,
                  "Pipeline.Log.AllocatedRowsets",
                  String.valueOf(rowsets.size()),
                  String.valueOf(i),
                  thisTransform.getName())
              + " ");
    }

    if (log.isDetailed()) {
      log.logDetailed(
          BaseMessages.getString(PKG, "Pipeline.Log.AllocatingTransformsAndTransformData"));
    }

    // Allocate the transforms & the data...
    //
    for (TransformMeta transformMeta : hopTransforms) {
      String transformid = transformMeta.getTransformPluginId();

      if (log.isDetailed()) {
        log.logDetailed(
            BaseMessages.getString(
                PKG,
                "Pipeline.Log.PipelineIsToAllocateTransform",
                transformMeta.getName(),
                transformid));
      }

      // How many copies are launched of this transform?
      int nrCopies = transformMeta.getCopies(this);

      if (log.isDebug()) {
        log.logDebug(
            BaseMessages.getString(
                PKG, "Pipeline.Log.TransformHasNumberRowCopies", String.valueOf(nrCopies)));
      }

      // At least run once...
      for (int c = 0; c < nrCopies; c++) {
        // Make sure we haven't started it yet!
        if (!hasTransformStarted(transformMeta.getName(), c)) {
          TransformMetaDataCombi combi = new TransformMetaDataCombi();

          combi.transformName = transformMeta.getName();
          combi.copy = c;

          // The meta-data
          combi.transformMeta = transformMeta;
          combi.meta = transformMeta.getTransform();

          // Allocate the transform data
          ITransformData data = combi.meta.createTransformData();
          combi.data = data;

          // Allocate the transform
          ITransform transform =
              combi.meta.createTransform(transformMeta, data, c, pipelineMeta, this);

          // Copy the variables of the pipeline to the transform...
          // don't share. Each copy of the transform has its own variables.
          //
          transform.initializeFrom(this);

          // Pass the metadataProvider to the transforms runtime
          //
          transform.setMetadataProvider(metadataProvider);

          // If the transform is partitioned, set the partitioning ID and some other
          // things as well...
          if (transformMeta.isPartitioned()) {
            List<String> partitionIDs =
                transformMeta
                    .getTransformPartitioningMeta()
                    .getPartitionSchema()
                    .calculatePartitionIds(this);
            if (partitionIDs != null && !partitionIDs.isEmpty()) {
              transform.setPartitionId(partitionIDs.get(c)); // Pass the partition ID
              // to the transform
            }
          }

          // Save the transform too
          combi.transform = transform;

          // Pass logging level and metrics gathering down to the transform level.
          // /
          if (combi.transform instanceof ILoggingObject) {
            ILogChannel logChannel = combi.transform.getLogChannel();
            logChannel.setLogLevel(logLevel);
            logChannel.setGatheringMetrics(log.isGatheringMetrics());
          }

          // Add to the bunch...
          transforms.add(combi);

          if (log.isDetailed()) {
            log.logDetailed(
                BaseMessages.getString(
                    PKG,
                    "Pipeline.Log.PipelineHasAllocatedANewTransform",
                    transformMeta.getName(),
                    String.valueOf(c)));
          }
        }
      }
    }

    // Now we need to verify if certain rowsets are not meant to be for error
    // handling...
    // Loop over the transforms and for every transform verify the output rowsets
    // If a rowset is going to a target transform in the transforms error handling
    // metadata, set it to the errorRowSet.
    // The input rowsets are already in place, so the next transform just accepts the
    // rows.
    // Metadata wise we need to do the same trick in PipelineMeta
    //
    for (TransformMetaDataCombi combi : transforms) {
      if (combi.transformMeta.isDoingErrorHandling()) {
        combi.transform.identifyErrorOutput();
      }
    }

    // Now (optionally) write start log record!
    // Make sure we synchronize appropriately to avoid duplicate batch IDs.
    //
    Object syncObject = this;
    if (parentWorkflow != null) {
      syncObject = parentWorkflow; // parallel execution in a workflow
    }
    if (parentPipeline != null) {
      syncObject = parentPipeline; // multiple sub-pipelines
    }
    synchronized (syncObject) {
      calculateBatchIdAndDateRange();
      beginProcessing();
    }

    // Set the partition-to-rowset mapping
    //
    for (TransformMetaDataCombi sid : transforms) {
      TransformMeta transformMeta = sid.transformMeta;
      ITransform baseTransform = sid.transform;

      baseTransform.setPartitioned(transformMeta.isPartitioned());

      // Now let's take a look at the source and target relation
      //
      // If this source transform is not partitioned, and the target transform is: it
      // means we need to re-partition the incoming data.
      // If both transforms are partitioned on the same method and schema, we don't
      // need to re-partition
      // If both transforms are partitioned on a different method or schema, we need
      // to re-partition as well.
      // If both transforms are not partitioned, we don't need to re-partition
      //
      boolean isThisPartitioned = transformMeta.isPartitioned();
      PartitionSchema thisPartitionSchema = null;
      if (isThisPartitioned) {
        thisPartitionSchema = transformMeta.getTransformPartitioningMeta().getPartitionSchema();
      }

      boolean isNextPartitioned = false;
      TransformPartitioningMeta nextTransformPartitioningMeta = null;
      PartitionSchema nextPartitionSchema = null;

      List<TransformMeta> nextTransforms = pipelineMeta.findNextTransforms(transformMeta);
      for (TransformMeta nextTransform : nextTransforms) {
        if (nextTransform.isPartitioned()) {
          isNextPartitioned = true;
          nextTransformPartitioningMeta = nextTransform.getTransformPartitioningMeta();
          nextPartitionSchema = nextTransformPartitioningMeta.getPartitionSchema();
        }
      }

      baseTransform.setRepartitioning(TransformPartitioningMeta.PARTITIONING_METHOD_NONE);

      // If the next transform is partitioned differently, set re-partitioning, when
      // running locally.
      //
      if ((!isThisPartitioned && isNextPartitioned)
          || (isThisPartitioned
              && isNextPartitioned
              && !thisPartitionSchema.equals(nextPartitionSchema))) {
        baseTransform.setRepartitioning(nextTransformPartitioningMeta.getMethodType());
      }

      // For partitioning to a set of remove transforms (repartitioning from a master
      // to a set or remote output transforms)
      //
      TransformPartitioningMeta targetTransformPartitioningMeta =
          baseTransform.getTransformMeta().getTargetTransformPartitioningMeta();
      if (targetTransformPartitioningMeta != null) {
        baseTransform.setRepartitioning(targetTransformPartitioningMeta.getMethodType());
      }
    }

    setPreparing(false);
    setInitializing(true);

    // Do a topology sort... Over 150 transform (copies) things might be slowing down too much.
    //
    if (isSortingTransformsTopologically() && transforms.size() < 150) {
      doTopologySortOfTransforms();
    }

    if (log.isDetailed()) {
      log.logDetailed(
          BaseMessages.getString(
              PKG, "Pipeline.Log.InitialisingTransforms", String.valueOf(transforms.size())));
    }

    TransformInitThread[] initThreads = new TransformInitThread[transforms.size()];
    Thread[] threads = new Thread[transforms.size()];

    // Initialize all the threads...
    //
    for (int i = 0; i < transforms.size(); i++) {
      final TransformMetaDataCombi sid = transforms.get(i);

      // Do the init code in the background!
      // Init all transforms at once, but ALL transforms need to finish before we can
      // continue properly!
      //
      initThreads[i] = new TransformInitThread(sid, this, log);

      // Put it in a separate thread!
      //
      threads[i] = new Thread(initThreads[i]);
      threads[i].setName(
          "init of " + sid.transformName + "." + sid.copy + " (" + threads[i].getName() + ")");

      ExtensionPointHandler.callExtensionPoint(
          log, this, HopExtensionPoint.TransformBeforeInitialize.id, initThreads[i]);

      threads[i].start();
    }

    for (int i = 0; i < threads.length; i++) {
      try {
        threads[i].join();
        ExtensionPointHandler.callExtensionPoint(
            log, this, HopExtensionPoint.TransformAfterInitialize.id, initThreads[i]);
      } catch (Exception ex) {
        log.logError("Error with init thread: " + ex.getMessage(), ex.getMessage());
        log.logError(Const.getStackTracker(ex));
      }
    }

    setInitializing(false);
    boolean ok = true;

    // All transform are initialized now: see if there was one that didn't do it
    // correctly!
    //
    for (TransformInitThread thread : initThreads) {
      TransformMetaDataCombi combi = thread.getCombi();
      if (!thread.isOk()) {
        log.logError(
            BaseMessages.getString(
                PKG, "Pipeline.Log.TransformFailedToInit", combi.transformName + "." + combi.copy));
        combi.data.setStatus(ComponentExecutionStatus.STATUS_STOPPED);
        ok = false;
      } else {
        combi.data.setStatus(ComponentExecutionStatus.STATUS_IDLE);
        if (log.isDetailed()) {
          log.logDetailed(
              BaseMessages.getString(
                  PKG,
                  "Pipeline.Log.TransformInitialized",
                  combi.transformName + "." + combi.copy));
        }
      }
    }

    if (!ok) {
      // Halt the other threads as well, signal end-of-the line to the outside
      // world...
      // Also explicitly call dispose() to clean up resources opened during
      // init()
      //
      for (TransformInitThread initThread : initThreads) {
        TransformMetaDataCombi combi = initThread.getCombi();

        // Dispose will overwrite the status, but we set it back right after
        // this.
        combi.transform.dispose();

        if (initThread.isOk()) {
          combi.data.setStatus(ComponentExecutionStatus.STATUS_HALTED);
        } else {
          combi.data.setStatus(ComponentExecutionStatus.STATUS_STOPPED);
        }
      }

      // Just for safety, fire the pipeline finished listeners...
      try {
        fireExecutionFinishedListeners();
      } catch (HopException e) {
        // listeners produces errors
        log.logError(BaseMessages.getString(PKG, "Pipeline.FinishListeners.Exception"));
        // we will not pass this exception up to prepareExecuton() entry point.
      } finally {
        // Flag the pipeline as finished even if exception was thrown
        setFinished(true);
      }

      // Pass along the log during preview. Otherwise it becomes hard to see
      // what went wrong.
      //
      if (preview) {
        String logText = HopLogStore.getAppender().getBuffer(getLogChannelId(), true).toString();
        throw new HopException(
            BaseMessages.getString(PKG, "Pipeline.Log.FailToInitializeAtLeastOneTransform")
                + Const.CR
                + logText);
      } else {
        throw new HopException(
            BaseMessages.getString(PKG, "Pipeline.Log.FailToInitializeAtLeastOneTransform")
                + Const.CR);
      }
    }

    log.snap(Metrics.METRIC_PIPELINE_INIT_STOP);

    setReadyToStart(true);
  }