public Result execute()

in plugins/actions/workflow/src/main/java/org/apache/hop/workflow/actions/workflow/ActionWorkflow.java [249:588]


  public Result execute(Result result, int nr) throws HopException {
    result.setEntryNr(nr);

    LogChannelFileWriter logChannelFileWriter = null;
    LogLevel workflowLogLevel = parentWorkflow.getLogLevel();

    if (setLogfile) {
      String realLogFilename = resolve(getLogFilename());
      // We need to check here the log filename
      // if we do not have one, we must fail
      if (Utils.isEmpty(realLogFilename)) {
        logError(BaseMessages.getString(PKG, "ActionWorkflow.Exception.LogFilenameMissing"));
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }

      // create parent folder?
      if (!createParentFolder(realLogFilename)) {
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }
      try {
        logChannelFileWriter =
            new LogChannelFileWriter(
                this.getLogChannelId(), HopVfs.getFileObject(realLogFilename), setAppendLogfile);
        logChannelFileWriter.startLogging();
      } catch (HopException e) {
        logError(
            "Unable to open file appender for file [" + getLogFilename() + "] : " + e.toString());
        logError(Const.getStackTracker(e));
        result.setNrErrors(1);
        result.setResult(false);
        return result;
      }
      workflowLogLevel = logFileLevel;
    }

    try {
      // First load the workflow, outside of the loop...
      if (parentWorkflow.getWorkflowMeta() != null) {
        // reset the internal variables again.
        // Maybe we should split up the variables even more like in UNIX shells.
        // The internal variables need to be reset to be able use them properly
        // in 2 sequential sub workflows.
        parentWorkflow.getWorkflowMeta().setInternalHopVariables(this);
      }

      // Explain what we are loading...
      //
      logDetailed("Loading workflow from XML file : [" + resolve(filename) + "]");

      WorkflowMeta workflowMeta = getWorkflowMeta(getMetadataProvider(), this);

      // Verify that we loaded something, complain if we did not...
      //
      if (workflowMeta == null) {
        throw new HopException("Unable to load the workflow: please specify a filename");
      }

      verifyRecursiveExecution(parentWorkflow, workflowMeta);

      int iteration = 0;

      copyFrom(parentWorkflow);
      setParentVariables(parentWorkflow);

      RowMetaAndData resultRow = null;
      boolean first = true;
      List<RowMetaAndData> rows = new ArrayList<>(result.getRows());

      while ((first && !execPerRow)
          || (execPerRow && rows != null && iteration < rows.size() && result.getNrErrors() == 0)) {
        first = false;

        // Clear the result rows of the result
        // Otherwise we double the amount of rows every iteration in the simple cases.
        //
        if (execPerRow) {
          result.getRows().clear();
        }

        if (rows != null && execPerRow) {
          resultRow = rows.get(iteration);
        } else {
          resultRow = null;
        }

        INamedParameters namedParam = new NamedParameters();

        // First (optionally) copy all the parameter values from the parent workflow
        //
        if (paramsFromPrevious) {
          String[] parentParameters = parentWorkflow.listParameters();
          for (int idx = 0; idx < parentParameters.length; idx++) {
            String par = parentParameters[idx];
            String def = parentWorkflow.getParameterDefault(par);
            String val = parentWorkflow.getParameterValue(par);
            String des = parentWorkflow.getParameterDescription(par);

            namedParam.addParameterDefinition(par, def, des);
            namedParam.setParameterValue(par, val);
          }
        }

        // Now add those parameter values specified by the user in the action
        //
        for (Parameter parameter : parameterDefinition.getParameters()) {
          if (!Utils.isEmpty(parameter.getName())) {

            // If it's not yet present in the parent workflow, add it...
            //
            if (Const.indexOfString(parameter.getName(), namedParam.listParameters()) < 0) {
              // We have a parameter
              try {
                namedParam.addParameterDefinition(parameter.getName(), "", "Action runtime");
              } catch (DuplicateParamException e) {
                // Should never happen
                //
                logError("Duplicate parameter definition for " + parameter.getName());
              }
            }

            if (Utils.isEmpty(Const.trim(parameter.getField()))) {
              namedParam.setParameterValue(
                  parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
            } else {
              // something filled in, in the field column...
              //
              String value = "";
              if (resultRow != null) {
                value = resultRow.getString(parameter.getField(), "");
              }
              namedParam.setParameterValue(parameter.getName(), value);
            }
          }
        }

        Result oneResult = new Result();

        List<RowMetaAndData> sourceRows = null;

        if (execPerRow) {
          // Execute for each input row
          // Just pass a single row
          //
          List<RowMetaAndData> newList = new ArrayList<>();
          newList.add(resultRow);
          sourceRows = newList;

          if (paramsFromPrevious) { // Copy the input the parameters

            for (Parameter parameter : parameterDefinition.getParameters()) {
              if (!Utils.isEmpty(parameter.getName())) {
                // We have a parameter
                if (Utils.isEmpty(Const.trim(parameter.getField()))) {
                  namedParam.setParameterValue(
                      parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
                } else {
                  String fieldValue = "";

                  if (resultRow != null) {
                    fieldValue = resultRow.getString(parameter.getField(), "");
                  }
                  // Get the value from the input stream
                  namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
                }
              }
            }
          }
        } else {

          // Keep it as it was...
          //
          sourceRows = result.getRows();

          if (paramsFromPrevious) { // Copy the input the parameters
            for (Parameter parameter : parameterDefinition.getParameters()) {
              if (!Utils.isEmpty(parameter.getName())) {
                // We have a parameter
                if (Utils.isEmpty(Const.trim(parameter.getField()))) {
                  namedParam.setParameterValue(
                      parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
                } else {
                  String fieldValue = "";

                  if (resultRow != null) {
                    fieldValue = resultRow.getString(parameter.getField(), "");
                  }
                  // Get the value from the input stream
                  namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
                }
              }
            }
          }
        }

        // Create a new workflow
        //
        workflow =
            WorkflowEngineFactory.createWorkflowEngine(
                this, resolve(runConfiguration), getMetadataProvider(), workflowMeta, this);
        workflow.setParentWorkflow(parentWorkflow);
        workflow.setLogLevel(workflowLogLevel);
        workflow.shareWith(this);
        workflow.setResult(result);
        workflow.setInternalHopVariables();
        workflow.copyParametersFromDefinitions(workflowMeta);
        workflow.setInteractive(parentWorkflow.isInteractive());
        if (workflow.isInteractive()) {
          workflow.getActionListeners().addAll(parentWorkflow.getActionListeners());
        }

        // Set the parameters calculated above on this instance.
        //
        workflow.clearParameterValues();
        String[] parameterNames = workflow.listParameters();
        for (int idx = 0; idx < parameterNames.length; idx++) {
          // Grab the parameter value set in the action
          //
          String thisValue = namedParam.getParameterValue(parameterNames[idx]);
          if (!Utils.isEmpty(thisValue)) {
            // Set the value as specified by the user in the action
            //
            workflow.setParameterValue(parameterNames[idx], thisValue);
          } else {
            // See if the parameter had a value set in the parent workflow...
            // This value should pass down to the sub-workflow if that's what we
            // opted to do.
            //
            if (parameterDefinition.isPassingAllParameters()) {
              String parentValue = parentWorkflow.getParameterValue(parameterNames[idx]);
              if (!Utils.isEmpty(parentValue)) {
                workflow.setParameterValue(parameterNames[idx], parentValue);
              }
            }
          }
        }
        workflow.activateParameters(workflow);

        // Set the source rows we calculated above...
        //
        workflow.setSourceRows(sourceRows);

        // Link the workflow with the sub-workflow
        parentWorkflow.getWorkflowTracker().addWorkflowTracker(workflow.getWorkflowTracker());

        // Link both ways!
        workflow.getWorkflowTracker().setParentWorkflowTracker(parentWorkflow.getWorkflowTracker());

        ActionWorkflowRunner runner =
            new ActionWorkflowRunner(workflow, result, nr, getLogChannel());
        Thread workflowRunnerThread = new Thread(runner);
        // added UUID to thread name, otherwise threads do share names if workflows actions are
        // executed in parallel in a
        // parent workflow
        // if that happens, contained pipelines start closing each other's connections
        workflowRunnerThread.setName(
            Const.NVL(
                    workflow.getWorkflowMeta().getName(), workflow.getWorkflowMeta().getFilename())
                + " UUID: "
                + UUID.randomUUID().toString());
        workflowRunnerThread.start();

        if (isWaitingToFinish()) {
          // Keep running until we're done.
          //
          while (!runner.isFinished() && !parentWorkflow.isStopped()) {
            try {
              Thread.sleep(0, 1);
            } catch (InterruptedException e) {
              // Ignore
            }
          }

          // if the parent-workflow was stopped, stop the sub-workflow too...
          if (parentWorkflow.isStopped()) {
            workflow.stopExecution();
            runner.waitUntilFinished(); // Wait until finished!
          }

          oneResult = runner.getResult();

          result.clear(); // clear only the numbers, NOT the files or rows.
          result.add(oneResult);

          // Set the result rows too, if any ...
          if (!Utils.isEmpty(oneResult.getRows())) {
            result.setRows(new ArrayList<>(oneResult.getRows()));
          }

          // if one of them fails (in the loop), increase the number of errors
          //
          if (oneResult.getResult() == false) {
            result.setNrErrors(result.getNrErrors() + 1);
          }
        }

        iteration++;
      }
    } catch (HopException ke) {
      logError("Error running action 'workflow' : ", ke);

      result.setResult(false);
      result.setNrErrors(1L);
    }

    if (setLogfile) {
      if (logChannelFileWriter != null) {
        logChannelFileWriter.stopLogging();

        ResultFile resultFile =
            new ResultFile(
                ResultFile.FILE_TYPE_LOG,
                logChannelFileWriter.getLogFile(),
                parentWorkflow.getWorkflowName(),
                getName());
        result.getResultFiles().put(resultFile.getFile().toString(), resultFile);

        // See if anything went wrong during file writing...
        //
        if (logChannelFileWriter.getException() != null) {
          logError("Unable to open log file [" + getLogFilename() + "] : ");
          logError(Const.getStackTracker(logChannelFileWriter.getException()));
          result.setNrErrors(1);
          result.setResult(false);
          return result;
        }
      }
    }

    if (result.getNrErrors() > 0) {
      result.setResult(false);
    } else {
      result.setResult(true);
    }

    return result;
  }