public Result execute()

in plugins/actions/pipeline/src/main/java/org/apache/hop/workflow/actions/pipeline/ActionPipeline.java [287:612]


  public Result execute(Result result, int nr) throws HopException {
    result.setEntryNr(nr);

    LogChannelFileWriter logChannelFileWriter = null;

    LogLevel pipelineLogLevel = parentWorkflow.getLogLevel();

    String realLogFilename = "";
    if (setLogfile) {
      pipelineLogLevel = logFileLevel;

      realLogFilename = resolve(getLogFilename());

      // We need to check here the log filename
      // if we do not have one, we must fail
      if (Utils.isEmpty(realLogFilename)) {
        logError(BaseMessages.getString(PKG, "ActionPipeline.Exception.LogFilenameMissing"));
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }
      // create parent folder?
      if (!FileUtil.createParentFolder(
          PKG, realLogFilename, createParentFolder, this.getLogChannel())) {
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }
      try {
        logChannelFileWriter =
            new LogChannelFileWriter(
                this.getLogChannelId(), HopVfs.getFileObject(realLogFilename), setAppendLogfile);
        logChannelFileWriter.startLogging();
      } catch (HopException e) {
        logError(
            BaseMessages.getString(
                PKG, "ActionPipeline.Error.UnableOpenAppender", realLogFilename, e.toString()));

        logError(Const.getStackTracker(e));
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }
    }

    logDetailed(
        BaseMessages.getString(PKG, "ActionPipeline.Log.OpeningPipeline", resolve(getFilename())));

    // Load the pipeline only once for the complete loop!
    // Throws an exception if it was not possible to load the pipeline, for example if the XML file
    // doesn't exist.
    // Log the stack trace and return an error condition from this
    //
    PipelineMeta pipelineMeta = null;
    try {
      pipelineMeta = getPipelineMeta(getMetadataProvider(), this);
    } catch (HopException e) {
      logError(
          BaseMessages.getString(
              PKG,
              "ActionPipeline.Exception.UnableToRunWorkflow",
              parentWorkflowMeta.getName(),
              getName(),
              StringUtils.trim(e.getMessage())),
          e);
      result.setNrErrors(1);
      result.setResult(false);
      return result;
    }

    int iteration = 0;

    RowMetaAndData resultRow = null;
    boolean first = true;
    List<RowMetaAndData> rows = new ArrayList<>(result.getRows());

    while ((first && !execPerRow)
        || (execPerRow && rows != null && iteration < rows.size() && result.getNrErrors() == 0)
            && !parentWorkflow.isStopped()) {
      // Clear the result rows of the result
      // Otherwise we double the amount of rows every iteration in the simple cases.
      //
      if (execPerRow) {
        result.getRows().clear();
      }
      if (rows != null && execPerRow) {
        resultRow = rows.get(iteration);
      } else {
        resultRow = null;
      }

      INamedParameters namedParam = new NamedParameters();
      for (Parameter parameter : parameterDefinition.getParameters()) {
        if (!Utils.isEmpty(parameter.getName())) {
          // We have a parameter
          //
          namedParam.addParameterDefinition(parameter.getName(), "", "Action runtime");
          if (Utils.isEmpty(Const.trim(parameter.getField()))) {
            // There is no field name specified.
            //
            String value = Const.NVL(resolve(parameter.getValue()), "");
            namedParam.setParameterValue(parameter.getName(), value);
          } else {
            // something filled in, in the field column...
            //
            String value = "";
            if (resultRow != null) {
              value = resultRow.getString(parameter.getField(), "");
            }
            namedParam.setParameterValue(parameter.getName(), value);
          }
        }
      }

      first = false;

      Result previousResult = result;

      try {
        if (isDetailed()) {
          logDetailed(
              BaseMessages.getString(
                  PKG,
                  "ActionPipeline.StartingPipeline",
                  getFilename(),
                  getName(),
                  getDescription()));
        }

        if (clearResultRows) {
          previousResult.setRows(new ArrayList<>());
        }

        if (clearResultFiles) {
          previousResult.getResultFiles().clear();
        }

        /*
         * Set one or more "result" rows on the pipeline...
         */
        if (execPerRow) {
          // Execute for each input row

          // Just pass a single row
          List<RowMetaAndData> newList = new ArrayList<>();
          newList.add(resultRow);

          // This previous result rows list can be either empty or not.
          // Depending on the checkbox "clear result rows"
          // In this case, it would execute the pipeline with one extra row each time
          // Can't figure out a real use-case for it, but hey, who am I to decide that, right?
          // :-)
          //
          previousResult.getRows().addAll(newList);

          if (paramsFromPrevious) { // Copy the input the parameters

            for (Parameter parameter : parameterDefinition.getParameters()) {
              if (!Utils.isEmpty(parameter.getName())) {
                // We have a parameter
                if (Utils.isEmpty(Const.trim(parameter.getField()))) {
                  namedParam.setParameterValue(
                      parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
                } else {
                  String fieldValue = "";

                  if (resultRow != null) {
                    fieldValue = resultRow.getString(parameter.getField(), "");
                  }
                  // Get the value from the input stream
                  namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
                }
              }
            }
          }
        } else {

          if (paramsFromPrevious) {
            // Copy the input the parameters
            for (Parameter parameter : parameterDefinition.getParameters()) {
              if (!Utils.isEmpty(parameter.getName())) {
                // We have a parameter
                if (Utils.isEmpty(Const.trim(parameter.getField()))) {
                  namedParam.setParameterValue(
                      parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
                } else {
                  String fieldValue = "";

                  if (resultRow != null) {
                    fieldValue = resultRow.getString(parameter.getField(), "");
                  }
                  // Get the value from the input stream
                  namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
                }
              }
            }
          }
        }

        // Handle the parameters...
        //
        String[] parameterNames = pipelineMeta.listParameters();

        prepareFieldNamesParameters(parameterDefinition.getParameters(), namedParam, this);

        if (StringUtils.isEmpty(runConfiguration)) {
          throw new HopException(
              "This action needs a run configuration to use to execute the specified pipeline");
        }

        runConfiguration = resolve(runConfiguration);
        logBasic(BaseMessages.getString(PKG, "ActionPipeline.RunConfig.Message", runConfiguration));

        // Create the pipeline from meta-data
        //
        pipeline =
            PipelineEngineFactory.createPipelineEngine(
                this, runConfiguration, getMetadataProvider(), pipelineMeta);
        pipeline.setParent(this);

        // set the parent workflow on the pipeline, variables are taken from here...
        //
        pipeline.setParentWorkflow(parentWorkflow);
        pipeline.setParentVariables(parentWorkflow);
        pipeline.setLogLevel(pipelineLogLevel);
        pipeline.setPreviousResult(previousResult);

        // inject the metadataProvider
        pipeline.setMetadataProvider(getMetadataProvider());

        // Handle parameters...
        //
        pipeline.initializeFrom(null);
        pipeline.copyParametersFromDefinitions(pipelineMeta);

        // Pass the parameter values and activate...
        //
        TransformWithMappingMeta.activateParams(
            pipeline,
            pipeline,
            this,
            parameterNames,
            parameterDefinition.getNames(),
            parameterDefinition.getValues(),
            parameterDefinition.isPassingAllParameters());

        // First get the root workflow
        //
        IWorkflowEngine<WorkflowMeta> rootWorkflow = parentWorkflow;
        while (rootWorkflow.getParentWorkflow() != null) {
          rootWorkflow = rootWorkflow.getParentWorkflow();
        }

        try {
          // Start execution...
          //
          pipeline.execute();

          // Wait until we're done with this pipeline
          //
          if (isWaitingToFinish()) {
            pipeline.waitUntilFinished();

            if (parentWorkflow.isStopped() || pipeline.getErrors() != 0) {
              pipeline.stopAll();
              result.setNrErrors(1);
            }
            updateResult(result);
          }
          if (setLogfile) {
            ResultFile resultFile =
                new ResultFile(
                    ResultFile.FILE_TYPE_LOG,
                    HopVfs.getFileObject(realLogFilename),
                    parentWorkflow.getWorkflowName(),
                    toString());
            result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
          }
        } catch (HopException e) {

          logError(BaseMessages.getString(PKG, "ActionPipeline.Error.UnablePrepareExec"), e);
          result.setNrErrors(1);
        }

      } catch (Exception e) {

        logError(
            BaseMessages.getString(PKG, "ActionPipeline.ErrorUnableOpenPipeline", e.getMessage()));
        logError(Const.getStackTracker(e));
        result.setNrErrors(1);
      }
      iteration++;
    }

    if (setLogfile) {
      if (logChannelFileWriter != null) {
        logChannelFileWriter.stopLogging();

        ResultFile resultFile =
            new ResultFile(
                ResultFile.FILE_TYPE_LOG,
                logChannelFileWriter.getLogFile(),
                parentWorkflow.getWorkflowName(),
                getName());
        result.getResultFiles().put(resultFile.getFile().toString(), resultFile);

        // See if anything went wrong during file writing...
        //
        if (logChannelFileWriter.getException() != null) {
          logError("Unable to open log file [" + getLogFilename() + "] : ");
          logError(Const.getStackTracker(logChannelFileWriter.getException()));
          result.setNrErrors(1);
          result.setResult(false);
          return result;
        }
      }
    }

    if (result.getNrErrors() == 0) {
      result.setResult(true);
    } else {
      result.setResult(false);
    }

    return result;
  }