in plugins/actions/workflow/src/main/java/org/apache/hop/workflow/actions/workflow/ActionWorkflow.java [249:588]
public Result execute(Result result, int nr) throws HopException {
result.setEntryNr(nr);
LogChannelFileWriter logChannelFileWriter = null;
LogLevel workflowLogLevel = parentWorkflow.getLogLevel();
if (setLogfile) {
String realLogFilename = resolve(getLogFilename());
// We need to check here the log filename
// if we do not have one, we must fail
if (Utils.isEmpty(realLogFilename)) {
logError(BaseMessages.getString(PKG, "ActionWorkflow.Exception.LogFilenameMissing"));
result.setNrErrors(1);
result.setResult(false);
return result;
}
// create parent folder?
if (!createParentFolder(realLogFilename)) {
result.setNrErrors(1);
result.setResult(false);
return result;
}
try {
logChannelFileWriter =
new LogChannelFileWriter(
this.getLogChannelId(), HopVfs.getFileObject(realLogFilename), setAppendLogfile);
logChannelFileWriter.startLogging();
} catch (HopException e) {
logError(
"Unable to open file appender for file [" + getLogFilename() + "] : " + e.toString());
logError(Const.getStackTracker(e));
result.setNrErrors(1);
result.setResult(false);
return result;
}
workflowLogLevel = logFileLevel;
}
try {
// First load the workflow, outside of the loop...
if (parentWorkflow.getWorkflowMeta() != null) {
// reset the internal variables again.
// Maybe we should split up the variables even more like in UNIX shells.
// The internal variables need to be reset to be able use them properly
// in 2 sequential sub workflows.
parentWorkflow.getWorkflowMeta().setInternalHopVariables(this);
}
// Explain what we are loading...
//
logDetailed("Loading workflow from XML file : [" + resolve(filename) + "]");
WorkflowMeta workflowMeta = getWorkflowMeta(getMetadataProvider(), this);
// Verify that we loaded something, complain if we did not...
//
if (workflowMeta == null) {
throw new HopException("Unable to load the workflow: please specify a filename");
}
verifyRecursiveExecution(parentWorkflow, workflowMeta);
int iteration = 0;
copyFrom(parentWorkflow);
setParentVariables(parentWorkflow);
RowMetaAndData resultRow = null;
boolean first = true;
List<RowMetaAndData> rows = new ArrayList<>(result.getRows());
while ((first && !execPerRow)
|| (execPerRow && rows != null && iteration < rows.size() && result.getNrErrors() == 0)) {
first = false;
// Clear the result rows of the result
// Otherwise we double the amount of rows every iteration in the simple cases.
//
if (execPerRow) {
result.getRows().clear();
}
if (rows != null && execPerRow) {
resultRow = rows.get(iteration);
} else {
resultRow = null;
}
INamedParameters namedParam = new NamedParameters();
// First (optionally) copy all the parameter values from the parent workflow
//
if (paramsFromPrevious) {
String[] parentParameters = parentWorkflow.listParameters();
for (int idx = 0; idx < parentParameters.length; idx++) {
String par = parentParameters[idx];
String def = parentWorkflow.getParameterDefault(par);
String val = parentWorkflow.getParameterValue(par);
String des = parentWorkflow.getParameterDescription(par);
namedParam.addParameterDefinition(par, def, des);
namedParam.setParameterValue(par, val);
}
}
// Now add those parameter values specified by the user in the action
//
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// If it's not yet present in the parent workflow, add it...
//
if (Const.indexOfString(parameter.getName(), namedParam.listParameters()) < 0) {
// We have a parameter
try {
namedParam.addParameterDefinition(parameter.getName(), "", "Action runtime");
} catch (DuplicateParamException e) {
// Should never happen
//
logError("Duplicate parameter definition for " + parameter.getName());
}
}
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
namedParam.setParameterValue(
parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
} else {
// something filled in, in the field column...
//
String value = "";
if (resultRow != null) {
value = resultRow.getString(parameter.getField(), "");
}
namedParam.setParameterValue(parameter.getName(), value);
}
}
}
Result oneResult = new Result();
List<RowMetaAndData> sourceRows = null;
if (execPerRow) {
// Execute for each input row
// Just pass a single row
//
List<RowMetaAndData> newList = new ArrayList<>();
newList.add(resultRow);
sourceRows = newList;
if (paramsFromPrevious) { // Copy the input the parameters
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// We have a parameter
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
namedParam.setParameterValue(
parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
} else {
String fieldValue = "";
if (resultRow != null) {
fieldValue = resultRow.getString(parameter.getField(), "");
}
// Get the value from the input stream
namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
}
}
}
}
} else {
// Keep it as it was...
//
sourceRows = result.getRows();
if (paramsFromPrevious) { // Copy the input the parameters
for (Parameter parameter : parameterDefinition.getParameters()) {
if (!Utils.isEmpty(parameter.getName())) {
// We have a parameter
if (Utils.isEmpty(Const.trim(parameter.getField()))) {
namedParam.setParameterValue(
parameter.getName(), Const.NVL(resolve(parameter.getValue()), ""));
} else {
String fieldValue = "";
if (resultRow != null) {
fieldValue = resultRow.getString(parameter.getField(), "");
}
// Get the value from the input stream
namedParam.setParameterValue(parameter.getName(), Const.NVL(fieldValue, ""));
}
}
}
}
}
// Create a new workflow
//
workflow =
WorkflowEngineFactory.createWorkflowEngine(
this, resolve(runConfiguration), getMetadataProvider(), workflowMeta, this);
workflow.setParentWorkflow(parentWorkflow);
workflow.setLogLevel(workflowLogLevel);
workflow.shareWith(this);
workflow.setResult(result);
workflow.setInternalHopVariables();
workflow.copyParametersFromDefinitions(workflowMeta);
workflow.setInteractive(parentWorkflow.isInteractive());
if (workflow.isInteractive()) {
workflow.getActionListeners().addAll(parentWorkflow.getActionListeners());
}
// Set the parameters calculated above on this instance.
//
workflow.clearParameterValues();
String[] parameterNames = workflow.listParameters();
for (int idx = 0; idx < parameterNames.length; idx++) {
// Grab the parameter value set in the action
//
String thisValue = namedParam.getParameterValue(parameterNames[idx]);
if (!Utils.isEmpty(thisValue)) {
// Set the value as specified by the user in the action
//
workflow.setParameterValue(parameterNames[idx], thisValue);
} else {
// See if the parameter had a value set in the parent workflow...
// This value should pass down to the sub-workflow if that's what we
// opted to do.
//
if (parameterDefinition.isPassingAllParameters()) {
String parentValue = parentWorkflow.getParameterValue(parameterNames[idx]);
if (!Utils.isEmpty(parentValue)) {
workflow.setParameterValue(parameterNames[idx], parentValue);
}
}
}
}
workflow.activateParameters(workflow);
// Set the source rows we calculated above...
//
workflow.setSourceRows(sourceRows);
// Link the workflow with the sub-workflow
parentWorkflow.getWorkflowTracker().addWorkflowTracker(workflow.getWorkflowTracker());
// Link both ways!
workflow.getWorkflowTracker().setParentWorkflowTracker(parentWorkflow.getWorkflowTracker());
ActionWorkflowRunner runner =
new ActionWorkflowRunner(workflow, result, nr, getLogChannel());
Thread workflowRunnerThread = new Thread(runner);
// added UUID to thread name, otherwise threads do share names if workflows actions are
// executed in parallel in a
// parent workflow
// if that happens, contained pipelines start closing each other's connections
workflowRunnerThread.setName(
Const.NVL(
workflow.getWorkflowMeta().getName(), workflow.getWorkflowMeta().getFilename())
+ " UUID: "
+ UUID.randomUUID().toString());
workflowRunnerThread.start();
if (isWaitingToFinish()) {
// Keep running until we're done.
//
while (!runner.isFinished() && !parentWorkflow.isStopped()) {
try {
Thread.sleep(0, 1);
} catch (InterruptedException e) {
// Ignore
}
}
// if the parent-workflow was stopped, stop the sub-workflow too...
if (parentWorkflow.isStopped()) {
workflow.stopExecution();
runner.waitUntilFinished(); // Wait until finished!
}
oneResult = runner.getResult();
result.clear(); // clear only the numbers, NOT the files or rows.
result.add(oneResult);
// Set the result rows too, if any ...
if (!Utils.isEmpty(oneResult.getRows())) {
result.setRows(new ArrayList<>(oneResult.getRows()));
}
// if one of them fails (in the loop), increase the number of errors
//
if (oneResult.getResult() == false) {
result.setNrErrors(result.getNrErrors() + 1);
}
}
iteration++;
}
} catch (HopException ke) {
logError("Error running action 'workflow' : ", ke);
result.setResult(false);
result.setNrErrors(1L);
}
if (setLogfile) {
if (logChannelFileWriter != null) {
logChannelFileWriter.stopLogging();
ResultFile resultFile =
new ResultFile(
ResultFile.FILE_TYPE_LOG,
logChannelFileWriter.getLogFile(),
parentWorkflow.getWorkflowName(),
getName());
result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
// See if anything went wrong during file writing...
//
if (logChannelFileWriter.getException() != null) {
logError("Unable to open log file [" + getLogFilename() + "] : ");
logError(Const.getStackTracker(logChannelFileWriter.getException()));
result.setNrErrors(1);
result.setResult(false);
return result;
}
}
}
if (result.getNrErrors() > 0) {
result.setResult(false);
} else {
result.setResult(true);
}
return result;
}